summaryrefslogtreecommitdiff
path: root/src/obsync/obsync
blob: 9f8facca141dcf6afcb9289f4393a1013ce1a86b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
#!/usr/bin/env python

#
# Ceph - scalable distributed file system
#
# Copyright (C) 2011 New Dream Network
#
# This is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1, as published by the Free Software
# Foundation.  See file COPYING.
#

"""
obsync.py: the object synchronizer
"""

from boto.s3.connection import OrdinaryCallingFormat
from boto.s3.connection import SubdomainCallingFormat
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from optparse import OptionParser
from sys import stderr
from lxml import etree
import base64
import boto
import errno
import hashlib
import mimetypes
import os
from StringIO import StringIO
import re
import shutil
import string
import sys
import tempfile
import time
import traceback
import xattr

try:
    import cloudfiles
except ImportError:
    cloudfiles = None

# Command-line options
global opts

# Translation table mapping users in the source to users in the destination.
global xuser

###### Usage #######
USAGE = """
obsync synchronizes S3, swift and local objects. The source and destination
can both be local or both remote.

Examples:
# copy contents of s3 bucket to disk
SRC_AKEY=... SRC_SKEY=... \
obsync -v --src-type=s3 --src-host=myhost --src-bucket=mybucket \
--dst-type=file --dst-path=mydir

# copy contents of mydir to a swift container
DST_AKEY=... DST_SKEY=... \
obsync -v --src-type=file --src-path=mydir --dst-type=swift \
--dst-authurl=myauthurl --dst-bucket=mycontainer

# synchronize two S3 buckets
SRC_AKEY=... SRC_SKEY=... \
DST_AKEY=... DST_SKEY=... \
obsync -v --src-type=s3 --src-host=oldhost --src-bucket=mybucket \
--dst-type=s3 --dst-host=newhost --dst-bucket=mybucket \
--xuser bob=robert --xuser joe=joseph

Note: You must specify an AWS access key and secret access key when accessing
S3. When accessing swift use your username as the access key and your api
key as the secret key. obsync honors these environment variables:
SRC_AKEY          Access key for the source URL
SRC_SKEY          Secret access key for the source URL
DST_AKEY          Access key for the destination URL
DST_SKEY          Secret access key for the destination URL
SRC_SECURE        If set, we'll use https to talk to the s3 source.
DST_SECURE        If set, we'll use https to talk to the s3 destination.
AKEY              Access key for both source and dest
SKEY              Secret access key for both source and dest
DST_CONSISTENCY   Set to 'eventual' if the destination is eventually consistent

If these environment variables are not given, we will fall back on libboto
defaults.

Note: if python-cloudfiles is not installed, swift stores will not be able
to be synced.
"""
###### Constants #######
ACL_XATTR = "rados.acl"
META_XATTR_PREFIX = "rados.meta."
CONTENT_TYPE_XATTR = "rados.content_type"

def vvprint(s):
    if (opts.more_verbose):
        print s

###### Exception classes #######
class ObsyncException(Exception):
    def __init__(self, ty, e):
        if (isinstance(e, str)):
            # from a string
            self.tb = "".join(traceback.format_stack())
            self.comment = e
        else:
            # from another exception
            self.tb = traceback.format_exc(100000)
            self.comment = None
        self.ty = ty

""" A temporary obsync exception.
The user may want to retry the operation that failed.
We can create one of these from a string or from another exception.
"""
class ObsyncTemporaryException(ObsyncException):
    def __init__(self, e):
        ObsyncException.__init__(self, "temporary", e)

""" A permanent obsync exception.
We can create one of these from a string or from another exception.
"""
class ObsyncPermanentException(ObsyncException):
    def __init__(self, e):
        ObsyncException.__init__(self, "permanent", e)

""" An exception we encountered while parsing input arguments.
"""
class ObsyncArgumentParsingException(ObsyncException):
    def __init__(self, e):
        ObsyncException.__init__(self, "argument_parsing", e)

""" Print out some exception information and exit the program.
Normally, this function expects to see Obsync exceptions. If a random exception
slips through, we treat it as an unknown error type.
"""
def print_obsync_exception_and_abort(e, origin):
    if (isinstance(e, ObsyncException)):
        if (isinstance(e, ObsyncArgumentParsingException)):
            print >>stderr, USAGE
        else:
            print >>stderr, e.tb
        if (e.comment != None):
            print >>stderr, e.comment
        print >>stderr, ("ERROR TYPE: %s, ORIGIN: %s" % (e.ty, origin))
        sys.exit(1)
    else:
        traceback.print_exc(100000, stderr)
        print >>stderr, "ERROR TYPE: unknown, ORIGIN: %s" % origin
        sys.exit(1)

###### Extended Attributes #######
def test_xattr_support(path):
    test_file = path + "/$TEST"
    f = open(test_file, 'w')
    f.close
    try:
        xattr.set(test_file, "test", "123", namespace=xattr.NS_USER)
        if xattr.get(test_file, "test", namespace=xattr.NS_USER) !=  "123":
            raise ObsyncPermanentException("test_xattr_support: failed to set an xattr and " + \
                "read it back.")
    except IOError, e:
        print >>stderr, "**** ERRROR: You do not appear to have xattr support " + \
            "at %s ****" % pa
        raise ObsyncPermanentException(exc)
    finally:
        os.unlink(test_file)

def xattr_is_metadata(k):
    # miscellaneous user-defined metadata
    if (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX):
        return True
    # content-type
    elif (k == CONTENT_TYPE_XATTR):
        return True
    return False

###### Helper functions #######
def mkdir_p(path):
    try:
        os.makedirs(path)
    except OSError, exc:
        if exc.errno != errno.EEXIST:
            raise ObsyncTemporaryException(exc)
        if (not os.path.isdir(path)):
            raise ObsyncTemporaryException(exc)

def bytes_to_str(b):
    return ''.join(["%02x"% ord(x) for x in b]).strip()

def get_md5(f, block_size=2**20):
    md5 = hashlib.md5()
    while True:
        data = f.read(block_size)
        if not data:
            break
        md5.update(data)
    return "%s" % md5.hexdigest()

def strip_prefix(prefix, s):
    if not (s[0:len(prefix)] == prefix):
        return None
    return s[len(prefix):]

def etag_to_md5(etag):
    if (etag[:1] == '"'):
        start = 1
    else:
        start = 0
    if (etag[-1:] == '"'):
        end = -1
    else:
        end = None
    return etag[start:end]

def getenv(a, b):
    if os.environ.has_key(a):
        return os.environ[a]
    elif b and os.environ.has_key(b):
        return os.environ[b]
    else:
        return None

# Escaping functions.
#
# Valid names for local files are a little different than valid object
# names for S3. So these functions are needed to translate.
#
# Basically, in local names, every sequence starting with a dollar sign is
# reserved as a special escape sequence. If you want to create an S3 object
# with a dollar sign in the name, the local file should have a double dollar
# sign ($$).
#
# TODO: translate local files' control characters into escape sequences.
# Most S3 clients (boto included) cannot handle control characters in S3 object
# names.
# TODO: check for invalid utf-8 in local file names. Ideally, escape it, but
# if not, just reject the local file name. S3 object names must be valid
# utf-8.
#
# ----------        -----------
# In S3             Locally
# ----------        -----------
# foo/              foo$slash
#
# $money            $$money
#
# obj-with-acl      obj-with-acl
#                  .obj-with-acl$acl
def s3_name_to_local_name(s3_name):
    s3_name = re.sub(r'\$', "$$", s3_name)
    if (s3_name[-1:] == "/"):
        s3_name = s3_name[:-1] + "$slash"
    return s3_name

def local_name_to_s3_name(local_name):
    local_name = re.sub(r'\$slash', "/", local_name)
    mre = re.compile("[$][^$]")
    if mre.match(local_name):
        raise ObsyncPermanentException("Local name contains a dollar sign escape \
sequence we don't understand.")
    local_name = re.sub(r'\$\$', "$", local_name)
    return local_name

###### ACLs #######

# for buckets: allow list
# for object: allow grantee to read object data and metadata
READ = 1

# for buckets: allow create, overwrite, or deletion of any object in the bucket
WRITE = 2

# for buckets: allow grantee to read the bucket ACL
# for objects: allow grantee to read the object ACL
READ_ACP = 4

# for buckets: allow grantee to write the bucket ACL
# for objects: allow grantee to write the object ACL
WRITE_ACP = 8

# all of the above
FULL_CONTROL = READ | WRITE | READ_ACP | WRITE_ACP

ACL_TYPE_CANON_USER = "canon:"
ACL_TYPE_EMAIL_USER = "email:"
ACL_TYPE_GROUP = "group:"
ALL_ACL_TYPES = [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]

S3_GROUP_AUTH_USERS = ACL_TYPE_GROUP +  "AuthenticatedUsers"
S3_GROUP_ALL_USERS = ACL_TYPE_GROUP +  "AllUsers"
S3_GROUP_LOG_DELIVERY = ACL_TYPE_GROUP +  "LogDelivery"

NS = "http://s3.amazonaws.com/doc/2006-03-01/"
NS2 = "http://www.w3.org/2001/XMLSchema-instance"

def get_user_type(utype):
    for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]:
        if utype[:len(ut)] == ut:
            return ut
    raise ObsyncPermanentException("unknown user type for user %s" % utype)

def strip_user_type(utype):
    for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]:
        if utype[:len(ut)] == ut:
            return utype[len(ut):]
    raise ObsyncPermanentException("unknown user type for user %s" % utype)

def grantee_attribute_to_user_type(utype):
    if (utype == "Canonical User"):
        return ACL_TYPE_CANON_USER
    elif (utype == "CanonicalUser"):
        return ACL_TYPE_CANON_USER
    elif (utype == "Group"):
        return ACL_TYPE_GROUP
    elif (utype == "Email User"):
        return ACL_TYPE_EMAIL_USER
    elif (utype == "EmailUser"):
        return ACL_TYPE_EMAIL_USER
    else:
        raise ObsyncPermanentException("unknown user type for user %s" % utype)

def user_type_to_attr(t):
    if (t == ACL_TYPE_CANON_USER):
        return "CanonicalUser"
    elif (t == ACL_TYPE_GROUP):
        return "Group"
    elif (t ==  ACL_TYPE_EMAIL_USER):
        return "EmailUser"
    else:
        raise ObsyncPermanentException("unknown user type %s" % t)

def add_user_type(user):
    """ All users that are not specifically marked as something else
are treated as canonical users"""
    for atype in ALL_ACL_TYPES:
        if (user[:len(atype)] == atype):
            return user
    return ACL_TYPE_CANON_USER + user

class AclGrant(object):
    def __init__(self, user_id, display_name, permission):
        self.user_id = user_id
        self.display_name = display_name
        self.permission = permission
    def translate_users(self, xusers):
        # Keep in mind that xusers contains user_ids of the form "type:value"
        # So typical contents might be like { canon:XYZ => canon.123 }
        if (xusers.has_key(self.user_id)):
            self.user_id = xusers[self.user_id]
            # It's not clear what the new pretty-name should be, so just leave it blank.
            self.display_name = None
    def equals(self, rhs):
        if (self.user_id != rhs.user_id):
            return False
        if (self.permission != rhs.permission):
            return False
        # ignore display_name
        return True

class AclPolicy(object):
    def __init__(self, owner_id, owner_display_name, grants):
        self.owner_id = owner_id
        self.owner_display_name = owner_display_name
        self.grants = grants  # dict of { string -> ACLGrant }
    @staticmethod
    def create_default(owner_id):
        grants = { }
        grants[ACL_TYPE_CANON_USER + owner_id] = \
            AclGrant(ACL_TYPE_CANON_USER + owner_id, None, "FULL_CONTROL")
        return AclPolicy(owner_id, None, grants)
    @staticmethod
    def from_xml(s):
        root = etree.parse(StringIO(s))
        owner_id_node = root.find("{%s}Owner/{%s}ID" % (NS,NS))
        owner_id = owner_id_node.text
        owner_display_name_node = root.find("{%s}Owner/{%s}DisplayName" \
                                        % (NS,NS))
        if (owner_display_name_node != None):
            owner_display_name = owner_display_name_node.text
        else:
            owner_display_name = None
        grantlist = root.findall("{%s}AccessControlList/{%s}Grant" \
            % (NS,NS))
        grants = { }
        for g in grantlist:
            grantee = g.find("{%s}Grantee" % NS)
            user_id = grantee.find("{%s}ID" % NS).text
            if (grantee.attrib.has_key("type")):
                user_type = grantee.attrib["type"]
            else:
                user_type = grantee.attrib["{%s}type" % NS2]
            display_name_node = grantee.find("{%s}DisplayName" % NS)
            if (display_name_node != None):
                display_name = grantee.find("{%s}DisplayName" % NS).text
            else:
                display_name = None
            permission = g.find("{%s}Permission" % NS).text
            grant_user_id = grantee_attribute_to_user_type(user_type) + user_id
            grants[grant_user_id] = AclGrant(grant_user_id, display_name, permission)
        return AclPolicy(owner_id, owner_display_name, grants)
    def to_xml(self):
        root = etree.Element("AccessControlPolicy", nsmap={None: NS})
        owner = etree.SubElement(root, "Owner")
        id_elem = etree.SubElement(owner, "ID")
        id_elem.text = self.owner_id
        if (self.owner_display_name and self.owner_display_name != ""):
            display_name_elem = etree.SubElement(owner, "DisplayName")
            display_name_elem.text = self.owner_display_name
        access_control_list = etree.SubElement(root, "AccessControlList")
        for k,g in self.grants.items():
            grant_elem = etree.SubElement(access_control_list, "Grant")
            grantee_elem = etree.SubElement(grant_elem, "{%s}Grantee" % NS,
                nsmap={None: NS, "xsi" : NS2})
            grantee_elem.set("{%s}type" % NS2, user_type_to_attr(get_user_type(g.user_id)))
            user_id_elem = etree.SubElement(grantee_elem, "{%s}ID" % NS)
            user_id_elem.text = strip_user_type(g.user_id)
            if (g.display_name != None):
                display_name_elem = etree.SubElement(grantee_elem, "{%s}DisplayName" % NS)
                display_name_elem.text = g.display_name
            permission_elem = etree.SubElement(grant_elem, "{%s}Permission" % NS)
            permission_elem.text = g.permission
        return etree.tostring(root, encoding="UTF-8")
    def translate_users(self, xusers):
        # Owner ids are always expressed in terms of canonical user id
        if (xusers.has_key(ACL_TYPE_CANON_USER + self.owner_id)):
            self.owner_id = \
                strip_user_type(xusers[ACL_TYPE_CANON_USER + self.owner_id])
            self.owner_display_name = ""
        for k,g in self.grants.items():
            g.translate_users(xusers)
    def get_all_users(self):
        """ Get a list of all user ids referenced in this ACL """
        users = {}
        users[ACL_TYPE_CANON_USER + self.owner_id] = 1
        for k,g in self.grants.items():
            users[k] = 1
        return users.keys()
    def set_owner(self, owner_id):
        self.owner_id = owner_id
        self.owner_display_name = ""
    def equals(self, rhs):
        if (self.owner_id != rhs.owner_id):
            return False
        for k,g in self.grants.items():
            if (not rhs.grants.has_key(k)):
                return False
            if (not g.equals(rhs.grants[k])):
                return False
        for l,r in rhs.grants.items():
            if (not self.grants.has_key(l)):
                return False
            if (not r.equals(self.grants[l])):
                return False
        return True

def compare_xml(xml1, xml2):
    tree1 = etree.parse(StringIO(xml1))
    out1 = etree.tostring(tree1, encoding="UTF-8")
    tree2 = etree.parse(StringIO(xml2))
    out2 = etree.tostring(tree2, encoding="UTF-8")
    out1 = out1.replace("xsi:type", "type")
    out2 = out2.replace("xsi:type", "type")
    if out1 != out2:
        print "out1 = %s" % out1
        print "out2 = %s" % out2
        raise ObsyncPermanentException("compare xml failed")

#<?xml version="1.0" encoding="UTF-8"?>
def test_acl_policy():
    test1_xml = \
"<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" + \
"<Owner><ID>foo</ID><DisplayName>MrFoo</DisplayName></Owner><AccessControlList>" + \
"<Grant><Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" " + \
"xsi:type=\"CanonicalUser\"><ID>bar</ID>" + \
"<DisplayName>display-name</DisplayName></Grantee>" + \
"<Permission>FULL_CONTROL</Permission></Grant></AccessControlList></AccessControlPolicy>"
    test1 = AclPolicy.from_xml(test1_xml)
    compare_xml(test1_xml, test1.to_xml())

###### Object #######
class Object(object):
    def __init__(self, name, md5, size, meta):
        self.name = name
        self.md5 = md5
        self.size = int(size)
        self.meta = meta
    def equals(self, rhs):
        if (self.name != rhs.name):
            vvprint("EQUALS: self.name = %s, rhs.name = %s" % (self.name, rhs.name))
            return False
        if (self.md5 != rhs.md5):
            vvprint("EQUALS: self.md5 = %s, rhs.md5 = %s" % (self.md5, rhs.md5))
            return False
        if (self.size != rhs.size):
            vvprint("EQUALS: self.size = %d, rhs.size = %d" % (self.size, rhs.size))
            return False
        for k,v in self.meta.items():
            if (not rhs.meta.has_key(k)):
                vvprint("EQUALS: rhs.meta lacks key %s" % k)
                return False
            if (rhs.meta[k] != v):
                vvprint("EQUALS: self.meta[%s] = %s, rhs.meta[%s] = %s" % \
                    (k, v, k, rhs.meta[k]))
                return False
        for k,v in rhs.meta.items():
            if (not self.meta.has_key(k)):
                vvprint("EQUALS: self.meta lacks key %s" % k)
                return False
        vvprint("EQUALS: the objects are equal.")
        return True
    def local_name(self):
        return s3_name_to_local_name(self.name)
    def local_path(self, base):
        return base + "/" + s3_name_to_local_name(self.name)
    @staticmethod
    def from_file(obj_name, path):
        f = open(path, 'r')
        try:
            md5 = get_md5(f)
        finally:
            f.close()
        size = os.path.getsize(path)
        meta = {}
        try:
            xlist = xattr.get_all(path, namespace=xattr.NS_USER)
        except IOError, e:
            if e.errno == 2:
                return meta
            else:
                raise ObsyncTemporaryException(e)
        for k,v in xlist:
            if xattr_is_metadata(k):
                meta[k] = v
        #print "Object.from_file: path="+path+",md5=" + bytes_to_str(md5) +",size=" + str(size)
        return Object(obj_name, md5, size, meta)

###### Store #######
class Store(object):
    @staticmethod
    def make_store(type, is_dst, create, akey, skey, **kwargs):
        if (type == 's3'):
            if (is_dst):
                is_secure = os.environ.has_key("DST_SECURE")
            else:
                is_secure = os.environ.has_key("SRC_SECURE")
            return S3Store(kwargs['host'], kwargs['bucket'], kwargs['prefix'], create, akey, skey, is_secure)
        elif (type == 'swift'):
            return SwiftStore(kwargs['host'], kwargs['bucket'], kwargs['prefix'], create, akey, skey, False)
        elif (type == 'file'):
            return FileStore(kwargs['path'], create)
        raise ObsyncPermanentException('Unknown store type: "%s"' %type)

###### LocalCopy ######
class LocalCopy(object):
    def __init__(self, obj_name, path, path_is_temp):
        self.obj_name = obj_name
        self.path = path
        self.path_is_temp = path_is_temp
    def remove(self):
        if ((self.path_is_temp == True) and (self.path != None)):
            os.unlink(self.path)
        self.path = None
        self.path_is_temp = False
    def __del__(self):
        self.remove()

class LocalAcl(object):
    @staticmethod
    def from_xml(obj_name, xml):
        acl_policy = AclPolicy.from_xml(xml)
        return LocalAcl(obj_name, acl_policy)
    @staticmethod
    def get_empty(obj_name):
        return LocalAcl(obj_name, None)
    def __init__(self, obj_name, acl_policy):
        self.obj_name = obj_name
        self.acl_policy = acl_policy
    def equals(self, rhs):
        """ Compare two LocalAcls """
        if (self.acl_policy == None):
            return (rhs.acl_policy == None)
        if (rhs.acl_policy == None):
            return (self.acl_policy == None)
        return self.acl_policy.equals(rhs.acl_policy)
    def translate_users(self, xusers):
        """ Translate the users in this ACL """
        if (self.acl_policy == None):
            return
        self.acl_policy.translate_users(xusers)
    def set_owner(self, owner_id):
        if (self.acl_policy == None):
            return
        self.acl_policy.set_owner(owner_id)
    def write_to_xattr(self, file_name):
        """ Write this ACL to an extended attribute """
        if (self.acl_policy == None):
            return
        xml = self.acl_policy.to_xml()
        xattr.set(file_name, ACL_XATTR, xml, namespace=xattr.NS_USER)

###### S3 store #######
def s3_key_to_meta(k):
    meta = {}
    if (k.__dict__.has_key("content_type")):
        meta[CONTENT_TYPE_XATTR] = k.content_type
    for k,v in k.metadata.items():
        meta[META_XATTR_PREFIX + k] = v
    return meta

def meta_to_s3_key(key, meta):
    for k,v in meta.items():
        if (k == CONTENT_TYPE_XATTR):
            key.set_metadata("Content-Type", v)
        elif (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX):
            k_name = k[len(META_XATTR_PREFIX):]
            key.set_metadata(k_name, v)
        else:
            raise ObsyncPermanentException("can't understand meta entry: %s" % k)

class S3StoreIterator(object):
    """S3Store iterator"""
    def __init__(self, bucket, blrs):
        self.bucket = bucket
        self.blrs = blrs
    def __iter__(self):
        return self
    def next(self):
        # This will raise StopIteration when there are no more objects to
        # iterate on
        key = self.blrs.next()
        # Issue a HEAD request to get content-type and other metadata
        k = self.bucket.get_key(key.name)
        ret = Object(key.name, etag_to_md5(key.etag), key.size, s3_key_to_meta(k))
        return ret

class S3Store(Store):
    def __init__(self, host, bucket_name, object_prefix, create, akey, skey, is_secure):
        self.host = host
        self.bucket_name = bucket_name
        self.key_prefix = object_prefix

        self.conn = S3Connection(calling_format=OrdinaryCallingFormat(),
                host=self.host, is_secure=is_secure,
                aws_access_key_id=akey, aws_secret_access_key=skey)

        self.bucket = self.conn.lookup(self.bucket_name)
        if (self.bucket == None):
            if (create):
                if (opts.dry_run):
                    raise ObsyncPermanentException("logic error: this should be unreachable.")
                self.bucket = self.conn.create_bucket(bucket_name = self.bucket_name)
            else:
                # Hrm, try again using Subdomain method.  This works
                # around PermanentRedirect.
                self.conn = S3Connection(calling_format=SubdomainCallingFormat(),
                                         host=self.host, is_secure=is_secure,
                                         aws_access_key_id=akey, aws_secret_access_key=skey)
                self.bucket = self.conn.lookup(self.bucket_name)
                if (self.bucket == None):
                    raise ObsyncPermanentException("%s: no such bucket as %s" % \
                                                       (host, self.bucket_name))

    def __str__(self):
        return "s3://" + self.host + "/" + self.bucket_name + "/" + self.key_prefix

    def get_acl(self, obj):
        acl_xml = self.bucket.get_xml_acl(obj.name)
        return LocalAcl.from_xml(obj.name, acl_xml)

    def make_local_copy(self, obj):
        k = Key(self.bucket)
        k.key = obj.name
        temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name
        try:
            k.get_contents_to_filename(temp_file)
        except Exception, e:
            os.unlink(temp_file)
            raise ObsyncTemporaryException(e)
        return LocalCopy(obj.name, temp_file, True)

    def all_objects(self):
        blrs = self.bucket.list(prefix = self.key_prefix)
        return S3StoreIterator(self.bucket, blrs.__iter__())

    def locate_object(self, obj):
        k = self.bucket.get_key(obj.name)
        if (k == None):
            return None
        return Object(obj.name, etag_to_md5(k.etag), k.size, s3_key_to_meta(k))

    def upload(self, local_copy, src_acl, obj):
        if (opts.more_verbose):
            print "S3Store.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
                "obj='" + obj.name + "'"
        if (opts.dry_run):
            return
        k = Key(self.bucket)
        k.key = obj.name
        meta_to_s3_key(k, obj.meta)
        k.set_contents_from_filename(local_copy.path)
        if (src_acl.acl_policy != None):
            xml = src_acl.acl_policy.to_xml()
            try:
                def fn():
                    self.bucket.set_xml_acl(xml, k)
                do_with_s3_retries(fn)
            except boto.exception.S3ResponseError, e:
                print >>stderr, "ERROR SETTING ACL on object '" + sobj.name + "'"
                print >>stderr
                print >>stderr, "************* ACL: *************"
                print >>stderr, str(xml)
                print >>stderr, "********************************"
                raise ObsyncTemporaryException(e)

    def remove(self, obj):
        if (opts.dry_run):
            return
        self.bucket.delete_key(obj.name)
        if (opts.more_verbose):
            print "S3Store: removed %s" % obj.name

# Some S3 servers offer "eventual consistency."
# What this means is that after a change has been made, like the creation of an
# object, it takes some time for this change to become visible to everyone.
# This potentially includes the client making the change.
#
# This means we need to implement a retry mechanism for certain operations.
# For example, setting the ACL on a newly created object may fail with an
# "object not found" error if the object creation hasn't yet become visible to
# us.
def do_with_s3_retries(fn):
    if (os.environ.has_key("DST_CONSISTENCY") and
            os.environ["DST_CONSISTENCY"] == "eventual"):
        sleep_times = [5, 10, 60, -1]
    else:
        sleep_times = [-1]
    for stime in sleep_times:
        try:
            fn()
            return
        except boto.exception.S3ResponseError, e:
            if (stime == -1):
                raise ObsyncTemporaryException(e)
            if (opts.verbose):
                print "encountered s3 response error: ",
            if (opts.more_verbose):
                print str(e) + ": ",
            if (opts.verbose):
                print "retrying operation after " + str(stime) + \
                    " second delay"
            time.sleep(stime)

###### Swift store #######
def swift_object_to_meta(obj):
    meta = {}
    if (obj.__dict__.has_key("content_type")):
        meta[CONTENT_TYPE_XATTR] = obj.content_type
    for k,v in obj.metadata.items():
        meta[META_XATTR_PREFIX + k] = v
    return meta

def meta_to_swift_object(obj, meta):
    for k,v in meta.items():
        if (k == CONTENT_TYPE_XATTR):
            obj.metadata["Content-Type"] = v
        elif (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX):
            k_name = k[len(META_XATTR_PREFIX):]
            obj.metadata[k_name] = v
        else:
            raise ObsyncPermanentException("can't understand meta entry: %s" % k)
        obj.sync_metadata()

class SwiftStoreIterator(object):
    """SwiftStore iterator"""
    def __init__(self, container, object_list):
        self.container = container
        self.generator = (o for o in object_list)
    def __iter__(self):
        return self
    def next(self):
        # This will raise StopIteration when there are no more objects to
        # iterate on
        obj = self.generator.next()
        obj = self.container.get_object(obj.name)
        ret = Object(obj.name, etag_to_md5(obj._etag), obj.size, swift_object_to_meta(obj))
        return ret

class SwiftStore(Store):
    def __init__(self, authurl, container_name, object_prefix, create, akey, skey, is_secure):
        self.host = authurl
        self.container_name = container_name
        self.object_prefix = object_prefix

        self.conn = cloudfiles.get_connection(username=akey, api_key=skey,
            authurl=self.host)
        try:
            self.container = self.conn.get_container(self.container_name)
        except cloudfiles.errors.NoSuchContainer:
            self.container = None

        if (self.container == None):
            if (create):
                if (opts.dry_run):
                    raise ObsyncPermanentException("logic error: this should be unreachable.")
                self.container = self.conn.create_container(self.container_name)
            else:
                raise ObsyncPermanentException("%s: no such container as %s" % \
                    (self.host, self.container_name))

    def __str__(self):
        return "swift://" + self.host + "/" + self.container_name + "/" + self.object_prefix

    def get_acl(self, obj):
        raise NotImplementedError # We always use --no-preserve-acls right now
    def make_local_copy(self, obj):
        o = self.container.get_object(obj.name)
        temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name
        try:
            o.save_to_filename(temp_file)
        except Exception, e:
            os.unlink(temp_file)
            raise ObsyncTemporaryException(e)
        return LocalCopy(obj.name, temp_file, True)

    def all_objects(self):
        object_list = self.container.get_objects(prefix = self.object_prefix)
        return SwiftStoreIterator(self.container, object_list)

    def locate_object(self, obj):
        try:
            o = self.container.get_object(obj.name)
        except cloudfiles.errors.NoSuchObject:
            return None
        return Object(obj.name, etag_to_md5(o._etag), o.size, swift_object_to_meta(o))

    def upload(self, local_copy, src_acl, obj):
        if (opts.more_verbose):
            print "SwiftStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
                "obj='" + obj.name + "'"
        if (opts.dry_run):
            return
        o = self.container.create_object(obj.name)
        o.load_from_filename(local_copy.path)
        meta_to_swift_object(o, obj.meta)
        if (src_acl.acl_policy != None):
            #FIXME: no acls for swift yet
            pass

    def remove(self, obj):
        if (opts.dry_run):
            return
        self.container.delete_object(obj.name)
        if (opts.more_verbose):
            print "SwiftStore: removed %s" % obj.name

###### FileStore #######
class FileStoreIterator(object):
    """FileStore iterator"""
    def __init__(self, base):
        self.base = base
        if (opts.follow_symlinks):
            self.generator = os.walk(base, followlinks=True)
        else:
            self.generator = os.walk(base)
        self.path = ""
        self.files = []
    def __iter__(self):
        return self
    def next(self):
        while True:
            if (len(self.files) == 0):
                self.path, dirs, self.files = self.generator.next()
                continue
            path = self.path + "/" + self.files[0]
            self.files = self.files[1:]
            # Ignore non-files when iterating.
            if (not os.path.isfile(path)):
                continue
            obj_name = local_name_to_s3_name(path[len(self.base)+1:])
            return Object.from_file(obj_name, path)

class FileStore(Store):
    def __init__(self, url, create):
        # Parse the file url
        self.base = url
        if (self.base[-1:] == '/'):
            self.base = self.base[:-1]
        if (create):
            if (opts.dry_run):
                raise ObsyncPermanentException("logic error: this should be unreachable.")
            mkdir_p(self.base)
        elif (not os.path.isdir(self.base)):
            raise ObsyncPermanentException("NonexistentStore")
        Store.__init__(self, "file://" + url)
        test_xattr_support(self.base)
    def __str__(self):
        return "file://" + self.base
    def get_acl(self, obj):
        try:
            xml = xattr.get(obj.local_path(self.base), ACL_XATTR,
                            namespace=xattr.NS_USER)
        except IOError, e:
            #print "failed to get XML ACL from %s" % obj.local_name()
            if e.errno == 61:
                return LocalAcl.get_empty(obj.name)
            raise ObsyncPermanentException(e)
        return LocalAcl.from_xml(obj.name, xml)
    def make_local_copy(self, obj):
        return LocalCopy(obj.name, obj.local_path(self.base), False)
    def all_objects(self):
        return FileStoreIterator(self.base)
    def locate_object(self, obj):
        path = obj.local_path(self.base)
        found = os.path.isfile(path)
        if (opts.more_verbose):
            if (found):
                print "FileStore::locate_object: found object '" + \
                    obj.name + "'"
            else:
                print "FileStore::locate_object: did not find object '" + \
                    obj.name + "'"
        if (not found):
            return None
        return Object.from_file(obj.name, path)
    def upload(self, local_copy, src_acl, obj):
        if (opts.more_verbose):
            print "FileStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
                "obj='" + obj.name + "'"
        if (opts.dry_run):
            return
        s = local_copy.path
        lname = obj.local_name()
        d = self.base + "/" + lname
        mkdir_p(os.path.dirname(d))
        shutil.copy(s, d)
        src_acl.write_to_xattr(d)
        # Store metadata in extended attributes
        for k,v in obj.meta.items():
            xattr.set(d, k, v, namespace=xattr.NS_USER)
    def remove(self, obj):
        if (opts.dry_run):
            return
        os.unlink(self.base + "/" + obj.name)
        if (opts.more_verbose):
            print "FileStore: removed %s" % obj.name

###### Functions #######
def delete_unreferenced(src, dst):
    """ delete everything from dst that is not referenced in src """
    if (opts.more_verbose):
        print "handling deletes."
    currently_handling = "destination"
    try:
        dst_all_objects = dst.all_objects()
        while True:
            dobj = dst_all_objects.next()
            currently_handling = "source"
            sobj = src.locate_object(dobj)
            currently_handling = "destination"
            if (sobj == None):
                dst.remove(dobj)
    except StopIteration:
        pass
    except Exception, e:
        print_obsync_exception_and_abort(e, currently_handling)

def xuser_cb(opt, opt_str, value, parser):
    """ handle an --xuser argument """
    equals = value.find(r'=')
    if equals == -1:
        raise ObsyncArgumentParsingException("Error parsing --xuser: You must \
give both a source and destination user name, like so:\n\
--xuser SOURCE_USER=DEST_USER\n\
\n\
This will translate the user SOURCE_USER in the source to the user DEST_USER \n\
in the destination.")
    src_user = value[:equals]
    dst_user = value[equals+1:]
    if ((len(src_user) == 0) or (len(dst_user) == 0)):
        raise ObsyncArgumentParsingException("Error parsing --xuser: \
can't have a zero-length user name.")
    src_user = add_user_type(src_user)
    dst_user = add_user_type(dst_user)
    if (xuser.has_key(src_user)):
        raise ObsyncArgumentParsingException("Error parsing --xuser: \
we are already translating \"%s\" to \"%s\"; we cannot translate it \
to \"%s\"" % (src_user, xuser[src_user], dst_user))
    xuser[src_user] = dst_user

############Begin#############
try:
    currently_handling = "unknown"
    parser = OptionParser(USAGE)
    parser.add_option("--src-type", dest="src_type", type="string",
        action="store", help="type of data  store for source. Can be " + \
        "'file', 's3', or 'swift'")
    parser.add_option("--src-path", dest="src_path", type="string",
        action="store", help="local file path for source")
    parser.add_option("--src-host", dest="src_host", type="string",
        action="store", help="host to connect to for source data")
    parser.add_option("--src-authurl", dest="src_authurl", type="string",
        action="store", help="authurl to connect to for source data")
    parser.add_option("--src-bucket", dest="src_bucket", type="string",
        action="store", help="source bucket")
    parser.add_option("--src-prefix", dest="src_prefix", type="string",
        action="store", help="object prefix for the source")
    parser.add_option("--dst-type", dest="dst_type", type="string",
        action="store", help="type of data  store for destination. Can be " + \
        "'file', 's3', or 'swift'")
    parser.add_option("--dst-path", dest="dst_path", type="string",
        action="store", help="local file path for destination")
    parser.add_option("--dst-host", dest="dst_host", type="string",
        action="store", help="host to connect to for destination data")
    parser.add_option("--dst-authurl", dest="dst_authurl", type="string",
        action="store", help="authurl to connect to for destination data")
    parser.add_option("--dst-bucket", dest="dst_bucket", type="string",
        action="store", help="destination bucket")
    parser.add_option("--dst-prefix", dest="dst_prefix", type="string",
        action="store", help="object prefix for the destination")
    parser.add_option("-n", "--dry-run", action="store_true", \
        dest="dry_run", default=False)
    parser.add_option("-c", "--create-dest", action="store_true", \
        dest="create", help="create the destination if it doesn't already exist")
    parser.add_option("--delete-before", action="store_true", \
        dest="delete_before", help="delete objects that aren't in SOURCE from \
    DESTINATION before transferring any objects")
    parser.add_option("--boto-retries", dest="boto_retries", type="int",
        help="set number of times we'll retry the same S3 operation")
    parser.add_option("-d", "--delete-after", action="store_true", \
        dest="delete_after", help="delete objects that aren't in SOURCE from \
    DESTINATION after doing all transfers.")
    parser.add_option("-L", "--follow-symlinks", action="store_true", \
        dest="follow_symlinks", help="follow symlinks (please avoid symlink " + \
        "loops when using this option!)")
    parser.add_option("--no-preserve-acls", action="store_true", \
        dest="no_preserve_acls", help="don't preserve ACLs when copying objects.")
    parser.add_option("-v", "--verbose", action="store_true", \
        dest="verbose", help="be verbose")
    parser.add_option("-V", "--more-verbose", action="store_true", \
        dest="more_verbose", help="be really, really verbose (developer mode)")
    parser.add_option("-x", "--xuser", type="string", nargs=1, action="callback", \
        dest="SRC=DST", callback=xuser_cb, help="set up a user tranlation. You \
    can specify multiple user translations with multiple --xuser arguments.")
    parser.add_option("--force", action="store_true", \
        dest="force", help="overwrite all destination objects, even if they \
    appear to be the same as the source objects.")
    parser.add_option("--unit", action="store_true", \
        dest="run_unit_tests", help="run unit tests and quit")
    xuser = {}
    (opts, args) = parser.parse_args()
    if (opts.run_unit_tests):
        test_acl_policy()
        sys.exit(0)

    if opts.boto_retries != None:
        if not boto.config.has_section('Boto'):
            boto.config.add_section('Boto')
        boto.config.set('Boto', 'num_retries', str(opts.boto_retries))

    if (not opts.src_type):
        raise ObsyncArgumentParsingException("src-type is required!")
    if (opts.src_type == 'file'):
        if (opts.src_host or opts.src_bucket or opts.src_prefix or opts.src_authurl):
            raise ObsyncArgumentParsingException("host, bucket, prefix, and " + \
                "authurl not required for local file stores.")
        if (not opts.src_path):
            raise ObsyncArgumentParsingException("src-path is required!")
    else:
        if (opts.src_type == 's3'):
            if (opts.src_authurl):
                raise ObsyncArgumentParsingException("src-authurl is not used for s3 stores!")
            if (not opts.src_host):
                raise ObsyncArgumentParsingException("src-host is required for s3 stores!")
        if (opts.src_type == 'swift'):
            if not cloudfiles:
                raise ObsyncArgumentParsingException("cloudfiles library not found. Swift syncing unavailable")
            if (not opts.src_authurl):
                raise ObsyncArgumentParsingException("src-authurl is required for swift stores!")
            if (opts.src_host):
                raise ObsyncArgumentParsingException("src-host is not used for swift stores!")
        if (not opts.src_bucket):
            raise ObsyncArgumentParsingException("src-bucket is required!")

    if (not opts.dst_type):
        raise ObsyncArgumentParsingException("dst-type is required!")
    if (opts.dst_type == 'file'):
        if (opts.dst_host or opts.dst_bucket or opts.dst_prefix or opts.dst_authurl):
            raise ObsyncArgumentParsingException("host, bucket, prefix, and " + \
                "authurl not required for local file stores.")
        if (not opts.dst_path):
            raise ObsyncArgumentParsingException("dst-path is required!")
    else:
        if (opts.dst_type == 's3'):
            if (opts.dst_authurl):
                raise ObsyncArgumentParsingException("dst-authurl is not used for s3 stores!")
            if (not opts.dst_host):
                raise ObsyncArgumentParsingException("dst-host is required for s3 stores!")
        if (opts.dst_type == 'swift'):
            if not cloudfiles:
                raise ObsyncArgumentParsingException("cloudfiles library not found. Swift syncing unavailable")
            if (not opts.dst_authurl):
                raise ObsyncArgumentParsingException("dst-authurl is required for swift stores!")
            if (opts.dst_host):
                raise ObsyncArgumentParsingException("dst-host is not used for swift stores!")
        if (not opts.dst_bucket):
            raise ObsyncArgumentParsingException("dst-bucket is required!")

    if opts.src_authurl:
        opts.src_host = opts.src_authurl
    if opts.dst_authurl:
        opts.dst_host = opts.dst_authurl
    opts.preserve_acls = not opts.no_preserve_acls
    if (opts.create and opts.dry_run):
        raise ObsyncArgumentParsingException("You can't run with both \
--create-dest and --dry-run! By definition, a dry run never changes anything.")

    if (opts.more_verbose):
        print >>stderr, "User translations:"
        for k,v in xuser.items():
            print >>stderr, "\"%s\" ==> \"%s\"" % (k, v)
        print >>stderr, ""
    if (opts.more_verbose):
        opts.verbose = True
        boto.set_stream_logger("stdout")
        boto.log.info("Enabling verbose boto logging.")
    if (opts.delete_before and opts.delete_after):
        raise ObsyncArgumentParsingException("It doesn't make sense to \
specify both --delete-before and --delete-after.")

    currently_handling = "source"
    if (opts.more_verbose):
        print "SOURCE: %s, %s, %s (%s)" % (opts.src_host, opts.src_bucket,
                opts.src_prefix, opts.src_type)
    try:
        src = Store.make_store(opts.src_type, False, False,
                getenv("SRC_AKEY", "AKEY"), getenv("SRC_SKEY", "SKEY"),
                host=opts.src_host, bucket=opts.src_bucket,
                prefix=opts.src_prefix, path=opts.src_path)
    except ObsyncException, e:
        if (e.comment == "NonexistentStore"):
            e.comment = "Fatal error: Source " + opts.src_host + " does " +\
                "not appear to exist."
        raise

    currently_handling = "destination"
    if (opts.more_verbose):
        print "DESTINATION: %s, %s, %s (%s)" % (opts.dst_host, opts.dst_bucket,
                opts.dst_prefix, opts.dst_type)
    try:
        dst = Store.make_store(opts.dst_type, True, opts.create,
                getenv("DST_AKEY", "AKEY"), getenv("DST_SKEY", "SKEY"),
                host=opts.dst_host, bucket=opts.dst_bucket,
                prefix=opts.dst_prefix, path=opts.dst_path)
    except ObsyncException, e:
        if (e.comment == "NonexistentStore"):
            e.comment = "Fatal error: Destination does not exist. Run " + \
                "with -c or --create-dest to create it automatically."
        raise

    if (opts.delete_before):
        delete_unreferenced(src, dst)

    currently_handling = "source"
    src_all_objects = src.all_objects()
    while True:
        currently_handling = "source"
        try:
            sobj = src_all_objects.next()
        except StopIteration:
            break
        if (opts.more_verbose):
            print "handling " + sobj.name
        pline = ""
        currently_handling = "destination"
        dobj = dst.locate_object(sobj)
        upload = False
        src_acl = None
        dst_acl = None
        if (opts.force):
            if (opts.verbose):
                pline += "F " + sobj.name
            upload = True
        elif (dobj == None):
            if (opts.verbose):
                pline += "+ " + sobj.name
            upload = True
        elif not sobj.equals(dobj):
            if (opts.verbose):
                pline += "> " + sobj.name
            upload = True
        elif (opts.preserve_acls):
            # Do the ACLs match?
            currently_handling = "source"
            src_acl = src.get_acl(sobj)
            currently_handling = "destination"
            dst_acl = dst.get_acl(dobj)
            currently_handling = "source"
            src_acl.translate_users(xuser)
            #src_acl.set_owner()
            if (not src_acl.equals(dst_acl)):
                upload = True
                if (opts.verbose):
                    pline += "^ %s" % sobj.name
        else:
            if (opts.verbose):
                pline += ". " + sobj.name
        if (upload):
            if (not opts.preserve_acls):
                # Just default to an empty ACL
                src_acl = LocalAcl.get_empty(sobj.name)
            else:
                if (src_acl == None):
                    currently_handling = "source"
                    src_acl = src.get_acl(sobj)
                    src_acl.translate_users(xuser)
                    #src_acl.set_owner()
            currently_handling = "source"
            local_copy = src.make_local_copy(sobj)
            try:
                currently_handling = "destination"
                dst.upload(local_copy, src_acl, sobj)
            finally:
                local_copy.remove()
        if (pline != ""):
            print pline

    if (opts.delete_after):
        delete_unreferenced(src, dst)
except Exception, e:
    print_obsync_exception_and_abort(e, currently_handling)

if (opts.more_verbose):
    print "finished."
sys.exit(0)