#!/usr/bin/env python # # Ceph - scalable distributed file system # # Copyright (C) 2011 New Dream Network # # This is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License version 2.1, as published by the Free Software # Foundation. See file COPYING. # """ obsync.py: the object synchronizer """ from boto.s3.connection import OrdinaryCallingFormat from boto.s3.connection import SubdomainCallingFormat from boto.s3.connection import S3Connection from boto.s3.key import Key from optparse import OptionParser from sys import stderr from lxml import etree import base64 import boto import errno import hashlib import mimetypes import os from StringIO import StringIO import re import shutil import string import sys import tempfile import time import traceback import xattr try: import cloudfiles except ImportError: cloudfiles = None # Command-line options global opts # Translation table mapping users in the source to users in the destination. global xuser ###### Usage ####### USAGE = """ obsync synchronizes S3, swift and local objects. The source and destination can both be local or both remote. Examples: # copy contents of s3 bucket to disk SRC_AKEY=... SRC_SKEY=... \ obsync -v --src-type=s3 --src-host=myhost --src-bucket=mybucket \ --dst-type=file --dst-path=mydir # copy contents of mydir to a swift container DST_AKEY=... DST_SKEY=... \ obsync -v --src-type=file --src-path=mydir --dst-type=swift \ --dst-authurl=myauthurl --dst-bucket=mycontainer # synchronize two S3 buckets SRC_AKEY=... SRC_SKEY=... \ DST_AKEY=... DST_SKEY=... \ obsync -v --src-type=s3 --src-host=oldhost --src-bucket=mybucket \ --dst-type=s3 --dst-host=newhost --dst-bucket=mybucket \ --xuser bob=robert --xuser joe=joseph Note: You must specify an AWS access key and secret access key when accessing S3. When accessing swift use your username as the access key and your api key as the secret key. obsync honors these environment variables: SRC_AKEY Access key for the source URL SRC_SKEY Secret access key for the source URL DST_AKEY Access key for the destination URL DST_SKEY Secret access key for the destination URL SRC_SECURE If set, we'll use https to talk to the s3 source. DST_SECURE If set, we'll use https to talk to the s3 destination. AKEY Access key for both source and dest SKEY Secret access key for both source and dest DST_CONSISTENCY Set to 'eventual' if the destination is eventually consistent If these environment variables are not given, we will fall back on libboto defaults. Note: if python-cloudfiles is not installed, swift stores will not be able to be synced. """ ###### Constants ####### ACL_XATTR = "rados.acl" META_XATTR_PREFIX = "rados.meta." CONTENT_TYPE_XATTR = "rados.content_type" def vvprint(s): if (opts.more_verbose): print s ###### Exception classes ####### class ObsyncException(Exception): def __init__(self, ty, e): if (isinstance(e, str)): # from a string self.tb = "".join(traceback.format_stack()) self.comment = e else: # from another exception self.tb = traceback.format_exc(100000) self.comment = None self.ty = ty """ A temporary obsync exception. The user may want to retry the operation that failed. We can create one of these from a string or from another exception. """ class ObsyncTemporaryException(ObsyncException): def __init__(self, e): ObsyncException.__init__(self, "temporary", e) """ A permanent obsync exception. We can create one of these from a string or from another exception. """ class ObsyncPermanentException(ObsyncException): def __init__(self, e): ObsyncException.__init__(self, "permanent", e) """ An exception we encountered while parsing input arguments. """ class ObsyncArgumentParsingException(ObsyncException): def __init__(self, e): ObsyncException.__init__(self, "argument_parsing", e) """ Print out some exception information and exit the program. Normally, this function expects to see Obsync exceptions. If a random exception slips through, we treat it as an unknown error type. """ def print_obsync_exception_and_abort(e, origin): if (isinstance(e, ObsyncException)): if (isinstance(e, ObsyncArgumentParsingException)): print >>stderr, USAGE else: print >>stderr, e.tb if (e.comment != None): print >>stderr, e.comment print >>stderr, ("ERROR TYPE: %s, ORIGIN: %s" % (e.ty, origin)) sys.exit(1) else: traceback.print_exc(100000, stderr) print >>stderr, "ERROR TYPE: unknown, ORIGIN: %s" % origin sys.exit(1) ###### Extended Attributes ####### def test_xattr_support(path): test_file = path + "/$TEST" f = open(test_file, 'w') f.close try: xattr.set(test_file, "test", "123", namespace=xattr.NS_USER) if xattr.get(test_file, "test", namespace=xattr.NS_USER) != "123": raise ObsyncPermanentException("test_xattr_support: failed to set an xattr and " + \ "read it back.") except IOError, e: print >>stderr, "**** ERRROR: You do not appear to have xattr support " + \ "at %s ****" % pa raise ObsyncPermanentException(exc) finally: os.unlink(test_file) def xattr_is_metadata(k): # miscellaneous user-defined metadata if (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX): return True # content-type elif (k == CONTENT_TYPE_XATTR): return True return False ###### Helper functions ####### def mkdir_p(path): try: os.makedirs(path) except OSError, exc: if exc.errno != errno.EEXIST: raise ObsyncTemporaryException(exc) if (not os.path.isdir(path)): raise ObsyncTemporaryException(exc) def bytes_to_str(b): return ''.join(["%02x"% ord(x) for x in b]).strip() def get_md5(f, block_size=2**20): md5 = hashlib.md5() while True: data = f.read(block_size) if not data: break md5.update(data) return "%s" % md5.hexdigest() def strip_prefix(prefix, s): if not (s[0:len(prefix)] == prefix): return None return s[len(prefix):] def etag_to_md5(etag): if (etag[:1] == '"'): start = 1 else: start = 0 if (etag[-1:] == '"'): end = -1 else: end = None return etag[start:end] def getenv(a, b): if os.environ.has_key(a): return os.environ[a] elif b and os.environ.has_key(b): return os.environ[b] else: return None # Escaping functions. # # Valid names for local files are a little different than valid object # names for S3. So these functions are needed to translate. # # Basically, in local names, every sequence starting with a dollar sign is # reserved as a special escape sequence. If you want to create an S3 object # with a dollar sign in the name, the local file should have a double dollar # sign ($$). # # TODO: translate local files' control characters into escape sequences. # Most S3 clients (boto included) cannot handle control characters in S3 object # names. # TODO: check for invalid utf-8 in local file names. Ideally, escape it, but # if not, just reject the local file name. S3 object names must be valid # utf-8. # # ---------- ----------- # In S3 Locally # ---------- ----------- # foo/ foo$slash # # $money $$money # # obj-with-acl obj-with-acl # .obj-with-acl$acl def s3_name_to_local_name(s3_name): s3_name = re.sub(r'\$', "$$", s3_name) if (s3_name[-1:] == "/"): s3_name = s3_name[:-1] + "$slash" return s3_name def local_name_to_s3_name(local_name): local_name = re.sub(r'\$slash', "/", local_name) mre = re.compile("[$][^$]") if mre.match(local_name): raise ObsyncPermanentException("Local name contains a dollar sign escape \ sequence we don't understand.") local_name = re.sub(r'\$\$', "$", local_name) return local_name ###### ACLs ####### # for buckets: allow list # for object: allow grantee to read object data and metadata READ = 1 # for buckets: allow create, overwrite, or deletion of any object in the bucket WRITE = 2 # for buckets: allow grantee to read the bucket ACL # for objects: allow grantee to read the object ACL READ_ACP = 4 # for buckets: allow grantee to write the bucket ACL # for objects: allow grantee to write the object ACL WRITE_ACP = 8 # all of the above FULL_CONTROL = READ | WRITE | READ_ACP | WRITE_ACP ACL_TYPE_CANON_USER = "canon:" ACL_TYPE_EMAIL_USER = "email:" ACL_TYPE_GROUP = "group:" ALL_ACL_TYPES = [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ] S3_GROUP_AUTH_USERS = ACL_TYPE_GROUP + "AuthenticatedUsers" S3_GROUP_ALL_USERS = ACL_TYPE_GROUP + "AllUsers" S3_GROUP_LOG_DELIVERY = ACL_TYPE_GROUP + "LogDelivery" NS = "http://s3.amazonaws.com/doc/2006-03-01/" NS2 = "http://www.w3.org/2001/XMLSchema-instance" def get_user_type(utype): for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]: if utype[:len(ut)] == ut: return ut raise ObsyncPermanentException("unknown user type for user %s" % utype) def strip_user_type(utype): for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]: if utype[:len(ut)] == ut: return utype[len(ut):] raise ObsyncPermanentException("unknown user type for user %s" % utype) def grantee_attribute_to_user_type(utype): if (utype == "Canonical User"): return ACL_TYPE_CANON_USER elif (utype == "CanonicalUser"): return ACL_TYPE_CANON_USER elif (utype == "Group"): return ACL_TYPE_GROUP elif (utype == "Email User"): return ACL_TYPE_EMAIL_USER elif (utype == "EmailUser"): return ACL_TYPE_EMAIL_USER else: raise ObsyncPermanentException("unknown user type for user %s" % utype) def user_type_to_attr(t): if (t == ACL_TYPE_CANON_USER): return "CanonicalUser" elif (t == ACL_TYPE_GROUP): return "Group" elif (t == ACL_TYPE_EMAIL_USER): return "EmailUser" else: raise ObsyncPermanentException("unknown user type %s" % t) def add_user_type(user): """ All users that are not specifically marked as something else are treated as canonical users""" for atype in ALL_ACL_TYPES: if (user[:len(atype)] == atype): return user return ACL_TYPE_CANON_USER + user class AclGrant(object): def __init__(self, user_id, display_name, permission): self.user_id = user_id self.display_name = display_name self.permission = permission def translate_users(self, xusers): # Keep in mind that xusers contains user_ids of the form "type:value" # So typical contents might be like { canon:XYZ => canon.123 } if (xusers.has_key(self.user_id)): self.user_id = xusers[self.user_id] # It's not clear what the new pretty-name should be, so just leave it blank. self.display_name = None def equals(self, rhs): if (self.user_id != rhs.user_id): return False if (self.permission != rhs.permission): return False # ignore display_name return True class AclPolicy(object): def __init__(self, owner_id, owner_display_name, grants): self.owner_id = owner_id self.owner_display_name = owner_display_name self.grants = grants # dict of { string -> ACLGrant } @staticmethod def create_default(owner_id): grants = { } grants[ACL_TYPE_CANON_USER + owner_id] = \ AclGrant(ACL_TYPE_CANON_USER + owner_id, None, "FULL_CONTROL") return AclPolicy(owner_id, None, grants) @staticmethod def from_xml(s): root = etree.parse(StringIO(s)) owner_id_node = root.find("{%s}Owner/{%s}ID" % (NS,NS)) owner_id = owner_id_node.text owner_display_name_node = root.find("{%s}Owner/{%s}DisplayName" \ % (NS,NS)) if (owner_display_name_node != None): owner_display_name = owner_display_name_node.text else: owner_display_name = None grantlist = root.findall("{%s}AccessControlList/{%s}Grant" \ % (NS,NS)) grants = { } for g in grantlist: grantee = g.find("{%s}Grantee" % NS) user_id = grantee.find("{%s}ID" % NS).text if (grantee.attrib.has_key("type")): user_type = grantee.attrib["type"] else: user_type = grantee.attrib["{%s}type" % NS2] display_name_node = grantee.find("{%s}DisplayName" % NS) if (display_name_node != None): display_name = grantee.find("{%s}DisplayName" % NS).text else: display_name = None permission = g.find("{%s}Permission" % NS).text grant_user_id = grantee_attribute_to_user_type(user_type) + user_id grants[grant_user_id] = AclGrant(grant_user_id, display_name, permission) return AclPolicy(owner_id, owner_display_name, grants) def to_xml(self): root = etree.Element("AccessControlPolicy", nsmap={None: NS}) owner = etree.SubElement(root, "Owner") id_elem = etree.SubElement(owner, "ID") id_elem.text = self.owner_id if (self.owner_display_name and self.owner_display_name != ""): display_name_elem = etree.SubElement(owner, "DisplayName") display_name_elem.text = self.owner_display_name access_control_list = etree.SubElement(root, "AccessControlList") for k,g in self.grants.items(): grant_elem = etree.SubElement(access_control_list, "Grant") grantee_elem = etree.SubElement(grant_elem, "{%s}Grantee" % NS, nsmap={None: NS, "xsi" : NS2}) grantee_elem.set("{%s}type" % NS2, user_type_to_attr(get_user_type(g.user_id))) user_id_elem = etree.SubElement(grantee_elem, "{%s}ID" % NS) user_id_elem.text = strip_user_type(g.user_id) if (g.display_name != None): display_name_elem = etree.SubElement(grantee_elem, "{%s}DisplayName" % NS) display_name_elem.text = g.display_name permission_elem = etree.SubElement(grant_elem, "{%s}Permission" % NS) permission_elem.text = g.permission return etree.tostring(root, encoding="UTF-8") def translate_users(self, xusers): # Owner ids are always expressed in terms of canonical user id if (xusers.has_key(ACL_TYPE_CANON_USER + self.owner_id)): self.owner_id = \ strip_user_type(xusers[ACL_TYPE_CANON_USER + self.owner_id]) self.owner_display_name = "" for k,g in self.grants.items(): g.translate_users(xusers) def get_all_users(self): """ Get a list of all user ids referenced in this ACL """ users = {} users[ACL_TYPE_CANON_USER + self.owner_id] = 1 for k,g in self.grants.items(): users[k] = 1 return users.keys() def set_owner(self, owner_id): self.owner_id = owner_id self.owner_display_name = "" def equals(self, rhs): if (self.owner_id != rhs.owner_id): return False for k,g in self.grants.items(): if (not rhs.grants.has_key(k)): return False if (not g.equals(rhs.grants[k])): return False for l,r in rhs.grants.items(): if (not self.grants.has_key(l)): return False if (not r.equals(self.grants[l])): return False return True def compare_xml(xml1, xml2): tree1 = etree.parse(StringIO(xml1)) out1 = etree.tostring(tree1, encoding="UTF-8") tree2 = etree.parse(StringIO(xml2)) out2 = etree.tostring(tree2, encoding="UTF-8") out1 = out1.replace("xsi:type", "type") out2 = out2.replace("xsi:type", "type") if out1 != out2: print "out1 = %s" % out1 print "out2 = %s" % out2 raise ObsyncPermanentException("compare xml failed") # def test_acl_policy(): test1_xml = \ "" + \ "fooMrFoo" + \ "bar" + \ "display-name" + \ "FULL_CONTROL" test1 = AclPolicy.from_xml(test1_xml) compare_xml(test1_xml, test1.to_xml()) ###### Object ####### class Object(object): def __init__(self, name, md5, size, meta): self.name = name self.md5 = md5 self.size = int(size) self.meta = meta def equals(self, rhs): if (self.name != rhs.name): vvprint("EQUALS: self.name = %s, rhs.name = %s" % (self.name, rhs.name)) return False if (self.md5 != rhs.md5): vvprint("EQUALS: self.md5 = %s, rhs.md5 = %s" % (self.md5, rhs.md5)) return False if (self.size != rhs.size): vvprint("EQUALS: self.size = %d, rhs.size = %d" % (self.size, rhs.size)) return False for k,v in self.meta.items(): if (not rhs.meta.has_key(k)): vvprint("EQUALS: rhs.meta lacks key %s" % k) return False if (rhs.meta[k] != v): vvprint("EQUALS: self.meta[%s] = %s, rhs.meta[%s] = %s" % \ (k, v, k, rhs.meta[k])) return False for k,v in rhs.meta.items(): if (not self.meta.has_key(k)): vvprint("EQUALS: self.meta lacks key %s" % k) return False vvprint("EQUALS: the objects are equal.") return True def local_name(self): return s3_name_to_local_name(self.name) def local_path(self, base): return base + "/" + s3_name_to_local_name(self.name) @staticmethod def from_file(obj_name, path): f = open(path, 'r') try: md5 = get_md5(f) finally: f.close() size = os.path.getsize(path) meta = {} try: xlist = xattr.get_all(path, namespace=xattr.NS_USER) except IOError, e: if e.errno == 2: return meta else: raise ObsyncTemporaryException(e) for k,v in xlist: if xattr_is_metadata(k): meta[k] = v #print "Object.from_file: path="+path+",md5=" + bytes_to_str(md5) +",size=" + str(size) return Object(obj_name, md5, size, meta) ###### Store ####### class Store(object): @staticmethod def make_store(type, is_dst, create, akey, skey, **kwargs): if (type == 's3'): if (is_dst): is_secure = os.environ.has_key("DST_SECURE") else: is_secure = os.environ.has_key("SRC_SECURE") return S3Store(kwargs['host'], kwargs['bucket'], kwargs['prefix'], create, akey, skey, is_secure) elif (type == 'swift'): return SwiftStore(kwargs['host'], kwargs['bucket'], kwargs['prefix'], create, akey, skey, False) elif (type == 'file'): return FileStore(kwargs['path'], create) raise ObsyncPermanentException('Unknown store type: "%s"' %type) ###### LocalCopy ###### class LocalCopy(object): def __init__(self, obj_name, path, path_is_temp): self.obj_name = obj_name self.path = path self.path_is_temp = path_is_temp def remove(self): if ((self.path_is_temp == True) and (self.path != None)): os.unlink(self.path) self.path = None self.path_is_temp = False def __del__(self): self.remove() class LocalAcl(object): @staticmethod def from_xml(obj_name, xml): acl_policy = AclPolicy.from_xml(xml) return LocalAcl(obj_name, acl_policy) @staticmethod def get_empty(obj_name): return LocalAcl(obj_name, None) def __init__(self, obj_name, acl_policy): self.obj_name = obj_name self.acl_policy = acl_policy def equals(self, rhs): """ Compare two LocalAcls """ if (self.acl_policy == None): return (rhs.acl_policy == None) if (rhs.acl_policy == None): return (self.acl_policy == None) return self.acl_policy.equals(rhs.acl_policy) def translate_users(self, xusers): """ Translate the users in this ACL """ if (self.acl_policy == None): return self.acl_policy.translate_users(xusers) def set_owner(self, owner_id): if (self.acl_policy == None): return self.acl_policy.set_owner(owner_id) def write_to_xattr(self, file_name): """ Write this ACL to an extended attribute """ if (self.acl_policy == None): return xml = self.acl_policy.to_xml() xattr.set(file_name, ACL_XATTR, xml, namespace=xattr.NS_USER) ###### S3 store ####### def s3_key_to_meta(k): meta = {} if (k.__dict__.has_key("content_type")): meta[CONTENT_TYPE_XATTR] = k.content_type for k,v in k.metadata.items(): meta[META_XATTR_PREFIX + k] = v return meta def meta_to_s3_key(key, meta): for k,v in meta.items(): if (k == CONTENT_TYPE_XATTR): key.set_metadata("Content-Type", v) elif (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX): k_name = k[len(META_XATTR_PREFIX):] key.set_metadata(k_name, v) else: raise ObsyncPermanentException("can't understand meta entry: %s" % k) class S3StoreIterator(object): """S3Store iterator""" def __init__(self, bucket, blrs): self.bucket = bucket self.blrs = blrs def __iter__(self): return self def next(self): # This will raise StopIteration when there are no more objects to # iterate on key = self.blrs.next() # Issue a HEAD request to get content-type and other metadata k = self.bucket.get_key(key.name) ret = Object(key.name, etag_to_md5(key.etag), key.size, s3_key_to_meta(k)) return ret class S3Store(Store): def __init__(self, host, bucket_name, object_prefix, create, akey, skey, is_secure): self.host = host self.bucket_name = bucket_name self.key_prefix = object_prefix self.conn = S3Connection(calling_format=OrdinaryCallingFormat(), host=self.host, is_secure=is_secure, aws_access_key_id=akey, aws_secret_access_key=skey) self.bucket = self.conn.lookup(self.bucket_name) if (self.bucket == None): if (create): if (opts.dry_run): raise ObsyncPermanentException("logic error: this should be unreachable.") self.bucket = self.conn.create_bucket(bucket_name = self.bucket_name) else: # Hrm, try again using Subdomain method. This works # around PermanentRedirect. self.conn = S3Connection(calling_format=SubdomainCallingFormat(), host=self.host, is_secure=is_secure, aws_access_key_id=akey, aws_secret_access_key=skey) self.bucket = self.conn.lookup(self.bucket_name) if (self.bucket == None): raise ObsyncPermanentException("%s: no such bucket as %s" % \ (host, self.bucket_name)) def __str__(self): return "s3://" + self.host + "/" + self.bucket_name + "/" + self.key_prefix def get_acl(self, obj): acl_xml = self.bucket.get_xml_acl(obj.name) return LocalAcl.from_xml(obj.name, acl_xml) def make_local_copy(self, obj): k = Key(self.bucket) k.key = obj.name temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name try: k.get_contents_to_filename(temp_file) except Exception, e: os.unlink(temp_file) raise ObsyncTemporaryException(e) return LocalCopy(obj.name, temp_file, True) def all_objects(self): blrs = self.bucket.list(prefix = self.key_prefix) return S3StoreIterator(self.bucket, blrs.__iter__()) def locate_object(self, obj): k = self.bucket.get_key(obj.name) if (k == None): return None return Object(obj.name, etag_to_md5(k.etag), k.size, s3_key_to_meta(k)) def upload(self, local_copy, src_acl, obj): if (opts.more_verbose): print "S3Store.UPLOAD: local_copy.path='" + local_copy.path + "' " + \ "obj='" + obj.name + "'" if (opts.dry_run): return k = Key(self.bucket) k.key = obj.name meta_to_s3_key(k, obj.meta) k.set_contents_from_filename(local_copy.path) if (src_acl.acl_policy != None): xml = src_acl.acl_policy.to_xml() try: def fn(): self.bucket.set_xml_acl(xml, k) do_with_s3_retries(fn) except boto.exception.S3ResponseError, e: print >>stderr, "ERROR SETTING ACL on object '" + sobj.name + "'" print >>stderr print >>stderr, "************* ACL: *************" print >>stderr, str(xml) print >>stderr, "********************************" raise ObsyncTemporaryException(e) def remove(self, obj): if (opts.dry_run): return self.bucket.delete_key(obj.name) if (opts.more_verbose): print "S3Store: removed %s" % obj.name # Some S3 servers offer "eventual consistency." # What this means is that after a change has been made, like the creation of an # object, it takes some time for this change to become visible to everyone. # This potentially includes the client making the change. # # This means we need to implement a retry mechanism for certain operations. # For example, setting the ACL on a newly created object may fail with an # "object not found" error if the object creation hasn't yet become visible to # us. def do_with_s3_retries(fn): if (os.environ.has_key("DST_CONSISTENCY") and os.environ["DST_CONSISTENCY"] == "eventual"): sleep_times = [5, 10, 60, -1] else: sleep_times = [-1] for stime in sleep_times: try: fn() return except boto.exception.S3ResponseError, e: if (stime == -1): raise ObsyncTemporaryException(e) if (opts.verbose): print "encountered s3 response error: ", if (opts.more_verbose): print str(e) + ": ", if (opts.verbose): print "retrying operation after " + str(stime) + \ " second delay" time.sleep(stime) ###### Swift store ####### def swift_object_to_meta(obj): meta = {} if (obj.__dict__.has_key("content_type")): meta[CONTENT_TYPE_XATTR] = obj.content_type for k,v in obj.metadata.items(): meta[META_XATTR_PREFIX + k] = v return meta def meta_to_swift_object(obj, meta): for k,v in meta.items(): if (k == CONTENT_TYPE_XATTR): obj.metadata["Content-Type"] = v elif (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX): k_name = k[len(META_XATTR_PREFIX):] obj.metadata[k_name] = v else: raise ObsyncPermanentException("can't understand meta entry: %s" % k) obj.sync_metadata() class SwiftStoreIterator(object): """SwiftStore iterator""" def __init__(self, container, object_list): self.container = container self.generator = (o for o in object_list) def __iter__(self): return self def next(self): # This will raise StopIteration when there are no more objects to # iterate on obj = self.generator.next() obj = self.container.get_object(obj.name) ret = Object(obj.name, etag_to_md5(obj._etag), obj.size, swift_object_to_meta(obj)) return ret class SwiftStore(Store): def __init__(self, authurl, container_name, object_prefix, create, akey, skey, is_secure): self.host = authurl self.container_name = container_name self.object_prefix = object_prefix self.conn = cloudfiles.get_connection(username=akey, api_key=skey, authurl=self.host) try: self.container = self.conn.get_container(self.container_name) except cloudfiles.errors.NoSuchContainer: self.container = None if (self.container == None): if (create): if (opts.dry_run): raise ObsyncPermanentException("logic error: this should be unreachable.") self.container = self.conn.create_container(self.container_name) else: raise ObsyncPermanentException("%s: no such container as %s" % \ (self.host, self.container_name)) def __str__(self): return "swift://" + self.host + "/" + self.container_name + "/" + self.object_prefix def get_acl(self, obj): raise NotImplementedError # We always use --no-preserve-acls right now def make_local_copy(self, obj): o = self.container.get_object(obj.name) temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name try: o.save_to_filename(temp_file) except Exception, e: os.unlink(temp_file) raise ObsyncTemporaryException(e) return LocalCopy(obj.name, temp_file, True) def all_objects(self): object_list = self.container.get_objects(prefix = self.object_prefix) return SwiftStoreIterator(self.container, object_list) def locate_object(self, obj): try: o = self.container.get_object(obj.name) except cloudfiles.errors.NoSuchObject: return None return Object(obj.name, etag_to_md5(o._etag), o.size, swift_object_to_meta(o)) def upload(self, local_copy, src_acl, obj): if (opts.more_verbose): print "SwiftStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \ "obj='" + obj.name + "'" if (opts.dry_run): return o = self.container.create_object(obj.name) o.load_from_filename(local_copy.path) meta_to_swift_object(o, obj.meta) if (src_acl.acl_policy != None): #FIXME: no acls for swift yet pass def remove(self, obj): if (opts.dry_run): return self.container.delete_object(obj.name) if (opts.more_verbose): print "SwiftStore: removed %s" % obj.name ###### FileStore ####### class FileStoreIterator(object): """FileStore iterator""" def __init__(self, base): self.base = base if (opts.follow_symlinks): self.generator = os.walk(base, followlinks=True) else: self.generator = os.walk(base) self.path = "" self.files = [] def __iter__(self): return self def next(self): while True: if (len(self.files) == 0): self.path, dirs, self.files = self.generator.next() continue path = self.path + "/" + self.files[0] self.files = self.files[1:] # Ignore non-files when iterating. if (not os.path.isfile(path)): continue obj_name = local_name_to_s3_name(path[len(self.base)+1:]) return Object.from_file(obj_name, path) class FileStore(Store): def __init__(self, url, create): # Parse the file url self.base = url if (self.base[-1:] == '/'): self.base = self.base[:-1] if (create): if (opts.dry_run): raise ObsyncPermanentException("logic error: this should be unreachable.") mkdir_p(self.base) elif (not os.path.isdir(self.base)): raise ObsyncPermanentException("NonexistentStore") Store.__init__(self, "file://" + url) test_xattr_support(self.base) def __str__(self): return "file://" + self.base def get_acl(self, obj): try: xml = xattr.get(obj.local_path(self.base), ACL_XATTR, namespace=xattr.NS_USER) except IOError, e: #print "failed to get XML ACL from %s" % obj.local_name() if e.errno == 61: return LocalAcl.get_empty(obj.name) raise ObsyncPermanentException(e) return LocalAcl.from_xml(obj.name, xml) def make_local_copy(self, obj): return LocalCopy(obj.name, obj.local_path(self.base), False) def all_objects(self): return FileStoreIterator(self.base) def locate_object(self, obj): path = obj.local_path(self.base) found = os.path.isfile(path) if (opts.more_verbose): if (found): print "FileStore::locate_object: found object '" + \ obj.name + "'" else: print "FileStore::locate_object: did not find object '" + \ obj.name + "'" if (not found): return None return Object.from_file(obj.name, path) def upload(self, local_copy, src_acl, obj): if (opts.more_verbose): print "FileStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \ "obj='" + obj.name + "'" if (opts.dry_run): return s = local_copy.path lname = obj.local_name() d = self.base + "/" + lname mkdir_p(os.path.dirname(d)) shutil.copy(s, d) src_acl.write_to_xattr(d) # Store metadata in extended attributes for k,v in obj.meta.items(): xattr.set(d, k, v, namespace=xattr.NS_USER) def remove(self, obj): if (opts.dry_run): return os.unlink(self.base + "/" + obj.name) if (opts.more_verbose): print "FileStore: removed %s" % obj.name ###### Functions ####### def delete_unreferenced(src, dst): """ delete everything from dst that is not referenced in src """ if (opts.more_verbose): print "handling deletes." currently_handling = "destination" try: dst_all_objects = dst.all_objects() while True: dobj = dst_all_objects.next() currently_handling = "source" sobj = src.locate_object(dobj) currently_handling = "destination" if (sobj == None): dst.remove(dobj) except StopIteration: pass except Exception, e: print_obsync_exception_and_abort(e, currently_handling) def xuser_cb(opt, opt_str, value, parser): """ handle an --xuser argument """ equals = value.find(r'=') if equals == -1: raise ObsyncArgumentParsingException("Error parsing --xuser: You must \ give both a source and destination user name, like so:\n\ --xuser SOURCE_USER=DEST_USER\n\ \n\ This will translate the user SOURCE_USER in the source to the user DEST_USER \n\ in the destination.") src_user = value[:equals] dst_user = value[equals+1:] if ((len(src_user) == 0) or (len(dst_user) == 0)): raise ObsyncArgumentParsingException("Error parsing --xuser: \ can't have a zero-length user name.") src_user = add_user_type(src_user) dst_user = add_user_type(dst_user) if (xuser.has_key(src_user)): raise ObsyncArgumentParsingException("Error parsing --xuser: \ we are already translating \"%s\" to \"%s\"; we cannot translate it \ to \"%s\"" % (src_user, xuser[src_user], dst_user)) xuser[src_user] = dst_user ############Begin############# try: currently_handling = "unknown" parser = OptionParser(USAGE) parser.add_option("--src-type", dest="src_type", type="string", action="store", help="type of data store for source. Can be " + \ "'file', 's3', or 'swift'") parser.add_option("--src-path", dest="src_path", type="string", action="store", help="local file path for source") parser.add_option("--src-host", dest="src_host", type="string", action="store", help="host to connect to for source data") parser.add_option("--src-authurl", dest="src_authurl", type="string", action="store", help="authurl to connect to for source data") parser.add_option("--src-bucket", dest="src_bucket", type="string", action="store", help="source bucket") parser.add_option("--src-prefix", dest="src_prefix", type="string", action="store", help="object prefix for the source") parser.add_option("--dst-type", dest="dst_type", type="string", action="store", help="type of data store for destination. Can be " + \ "'file', 's3', or 'swift'") parser.add_option("--dst-path", dest="dst_path", type="string", action="store", help="local file path for destination") parser.add_option("--dst-host", dest="dst_host", type="string", action="store", help="host to connect to for destination data") parser.add_option("--dst-authurl", dest="dst_authurl", type="string", action="store", help="authurl to connect to for destination data") parser.add_option("--dst-bucket", dest="dst_bucket", type="string", action="store", help="destination bucket") parser.add_option("--dst-prefix", dest="dst_prefix", type="string", action="store", help="object prefix for the destination") parser.add_option("-n", "--dry-run", action="store_true", \ dest="dry_run", default=False) parser.add_option("-c", "--create-dest", action="store_true", \ dest="create", help="create the destination if it doesn't already exist") parser.add_option("--delete-before", action="store_true", \ dest="delete_before", help="delete objects that aren't in SOURCE from \ DESTINATION before transferring any objects") parser.add_option("--boto-retries", dest="boto_retries", type="int", help="set number of times we'll retry the same S3 operation") parser.add_option("-d", "--delete-after", action="store_true", \ dest="delete_after", help="delete objects that aren't in SOURCE from \ DESTINATION after doing all transfers.") parser.add_option("-L", "--follow-symlinks", action="store_true", \ dest="follow_symlinks", help="follow symlinks (please avoid symlink " + \ "loops when using this option!)") parser.add_option("--no-preserve-acls", action="store_true", \ dest="no_preserve_acls", help="don't preserve ACLs when copying objects.") parser.add_option("-v", "--verbose", action="store_true", \ dest="verbose", help="be verbose") parser.add_option("-V", "--more-verbose", action="store_true", \ dest="more_verbose", help="be really, really verbose (developer mode)") parser.add_option("-x", "--xuser", type="string", nargs=1, action="callback", \ dest="SRC=DST", callback=xuser_cb, help="set up a user tranlation. You \ can specify multiple user translations with multiple --xuser arguments.") parser.add_option("--force", action="store_true", \ dest="force", help="overwrite all destination objects, even if they \ appear to be the same as the source objects.") parser.add_option("--unit", action="store_true", \ dest="run_unit_tests", help="run unit tests and quit") xuser = {} (opts, args) = parser.parse_args() if (opts.run_unit_tests): test_acl_policy() sys.exit(0) if opts.boto_retries != None: if not boto.config.has_section('Boto'): boto.config.add_section('Boto') boto.config.set('Boto', 'num_retries', str(opts.boto_retries)) if (not opts.src_type): raise ObsyncArgumentParsingException("src-type is required!") if (opts.src_type == 'file'): if (opts.src_host or opts.src_bucket or opts.src_prefix or opts.src_authurl): raise ObsyncArgumentParsingException("host, bucket, prefix, and " + \ "authurl not required for local file stores.") if (not opts.src_path): raise ObsyncArgumentParsingException("src-path is required!") else: if (opts.src_type == 's3'): if (opts.src_authurl): raise ObsyncArgumentParsingException("src-authurl is not used for s3 stores!") if (not opts.src_host): raise ObsyncArgumentParsingException("src-host is required for s3 stores!") if (opts.src_type == 'swift'): if not cloudfiles: raise ObsyncArgumentParsingException("cloudfiles library not found. Swift syncing unavailable") if (not opts.src_authurl): raise ObsyncArgumentParsingException("src-authurl is required for swift stores!") if (opts.src_host): raise ObsyncArgumentParsingException("src-host is not used for swift stores!") if (not opts.src_bucket): raise ObsyncArgumentParsingException("src-bucket is required!") if (not opts.dst_type): raise ObsyncArgumentParsingException("dst-type is required!") if (opts.dst_type == 'file'): if (opts.dst_host or opts.dst_bucket or opts.dst_prefix or opts.dst_authurl): raise ObsyncArgumentParsingException("host, bucket, prefix, and " + \ "authurl not required for local file stores.") if (not opts.dst_path): raise ObsyncArgumentParsingException("dst-path is required!") else: if (opts.dst_type == 's3'): if (opts.dst_authurl): raise ObsyncArgumentParsingException("dst-authurl is not used for s3 stores!") if (not opts.dst_host): raise ObsyncArgumentParsingException("dst-host is required for s3 stores!") if (opts.dst_type == 'swift'): if not cloudfiles: raise ObsyncArgumentParsingException("cloudfiles library not found. Swift syncing unavailable") if (not opts.dst_authurl): raise ObsyncArgumentParsingException("dst-authurl is required for swift stores!") if (opts.dst_host): raise ObsyncArgumentParsingException("dst-host is not used for swift stores!") if (not opts.dst_bucket): raise ObsyncArgumentParsingException("dst-bucket is required!") if opts.src_authurl: opts.src_host = opts.src_authurl if opts.dst_authurl: opts.dst_host = opts.dst_authurl opts.preserve_acls = not opts.no_preserve_acls if (opts.create and opts.dry_run): raise ObsyncArgumentParsingException("You can't run with both \ --create-dest and --dry-run! By definition, a dry run never changes anything.") if (opts.more_verbose): print >>stderr, "User translations:" for k,v in xuser.items(): print >>stderr, "\"%s\" ==> \"%s\"" % (k, v) print >>stderr, "" if (opts.more_verbose): opts.verbose = True boto.set_stream_logger("stdout") boto.log.info("Enabling verbose boto logging.") if (opts.delete_before and opts.delete_after): raise ObsyncArgumentParsingException("It doesn't make sense to \ specify both --delete-before and --delete-after.") currently_handling = "source" if (opts.more_verbose): print "SOURCE: %s, %s, %s (%s)" % (opts.src_host, opts.src_bucket, opts.src_prefix, opts.src_type) try: src = Store.make_store(opts.src_type, False, False, getenv("SRC_AKEY", "AKEY"), getenv("SRC_SKEY", "SKEY"), host=opts.src_host, bucket=opts.src_bucket, prefix=opts.src_prefix, path=opts.src_path) except ObsyncException, e: if (e.comment == "NonexistentStore"): e.comment = "Fatal error: Source " + opts.src_host + " does " +\ "not appear to exist." raise currently_handling = "destination" if (opts.more_verbose): print "DESTINATION: %s, %s, %s (%s)" % (opts.dst_host, opts.dst_bucket, opts.dst_prefix, opts.dst_type) try: dst = Store.make_store(opts.dst_type, True, opts.create, getenv("DST_AKEY", "AKEY"), getenv("DST_SKEY", "SKEY"), host=opts.dst_host, bucket=opts.dst_bucket, prefix=opts.dst_prefix, path=opts.dst_path) except ObsyncException, e: if (e.comment == "NonexistentStore"): e.comment = "Fatal error: Destination does not exist. Run " + \ "with -c or --create-dest to create it automatically." raise if (opts.delete_before): delete_unreferenced(src, dst) currently_handling = "source" src_all_objects = src.all_objects() while True: currently_handling = "source" try: sobj = src_all_objects.next() except StopIteration: break if (opts.more_verbose): print "handling " + sobj.name pline = "" currently_handling = "destination" dobj = dst.locate_object(sobj) upload = False src_acl = None dst_acl = None if (opts.force): if (opts.verbose): pline += "F " + sobj.name upload = True elif (dobj == None): if (opts.verbose): pline += "+ " + sobj.name upload = True elif not sobj.equals(dobj): if (opts.verbose): pline += "> " + sobj.name upload = True elif (opts.preserve_acls): # Do the ACLs match? currently_handling = "source" src_acl = src.get_acl(sobj) currently_handling = "destination" dst_acl = dst.get_acl(dobj) currently_handling = "source" src_acl.translate_users(xuser) #src_acl.set_owner() if (not src_acl.equals(dst_acl)): upload = True if (opts.verbose): pline += "^ %s" % sobj.name else: if (opts.verbose): pline += ". " + sobj.name if (upload): if (not opts.preserve_acls): # Just default to an empty ACL src_acl = LocalAcl.get_empty(sobj.name) else: if (src_acl == None): currently_handling = "source" src_acl = src.get_acl(sobj) src_acl.translate_users(xuser) #src_acl.set_owner() currently_handling = "source" local_copy = src.make_local_copy(sobj) try: currently_handling = "destination" dst.upload(local_copy, src_acl, sobj) finally: local_copy.remove() if (pline != ""): print pline if (opts.delete_after): delete_unreferenced(src, dst) except Exception, e: print_obsync_exception_and_abort(e, currently_handling) if (opts.more_verbose): print "finished." sys.exit(0)