1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
import unittest
execfile("commontest.py")
rbexec("main.py")
Log.setverbosity(7)
lc = Globals.local_connection
class RestoreTest(unittest.TestCase):
"""Test Restore class"""
prefix = "testfiles/restoretest/"
def maketesttuples(self, basename):
"""Make testing tuples from available files starting with prefix"""
dirlist = os.listdir(self.prefix)
baselist = filter(lambda f: f.startswith(basename), dirlist)
rps = map(lambda f: RPath(lc, self.prefix+f), baselist)
incs = filter(lambda rp: rp.isincfile(), rps)
tuples = map(lambda rp: (rp, RPath(lc, "%s.%s" %
(rp.getincbase().path,
rp.getinctime()))),
incs)
return tuples, incs
def restoreonefiletest(self, basename):
tuples, incs = self.maketesttuples(basename)
rpbase = RPath(lc, self.prefix + basename)
rptarget = RPath(lc, "testfiles/outfile")
for pair in tuples:
print "Processing file " + pair[0].path
if rptarget.lstat(): rptarget.delete()
rest_time = Time.stringtotime(pair[0].getinctime())
sorted_incs = Restore.sortincseq(rest_time, incs)
Restore.RestoreFile(rest_time, rpbase, (), sorted_incs, rptarget)
rptarget.setdata()
if not rptarget.lstat(): assert not pair[1].lstat()
elif not pair[1].lstat(): assert not rptarget.lstat()
else:
assert RPath.cmp(rptarget, pair[1]), \
"%s %s" % (rptarget.path, pair[1].path)
assert RPath.cmp_attribs(rptarget, pair[1]), \
"%s %s" % (rptarget.path, pair[1].path)
rptarget.delete()
def testsortincseq(self):
"""Test the Restore.sortincseq function
This test just makes sure that it comes up with the right
number of increments for each base name - given a list of
increments, we should eventually get sorted sequences that
end in each one (each one will be the last increment once).
"""
for basename in ['ocaml', 'mf']:
tuples, incs = self.maketesttuples(basename)
completed_dict = {}
for i in range(len(tuples)):
pair = tuples[i]
rest_time = Time.stringtotime(pair[0].getinctime())
sorted_incs = Restore.sortincseq(rest_time, incs)
key = sorted_incs[-1].path
assert not completed_dict.has_key(key)
completed_dict[key] = 1
for inc in incs: assert completed_dict[inc.path] == 1
def testRestorefiles(self):
"""Testing restoration of files one at a time"""
map(self.restoreonefiletest, ["ocaml", "mf"])
def testRestoreDir(self):
"""Test restoring from a real backup set"""
Myrm("testfiles/output")
InternalRestore(1, 1, "testfiles/restoretest3",
"testfiles/output", 20000)
src_rp = RPath(Globals.local_connection, "testfiles/increment2")
restore_rp = RPath(Globals.local_connection, "testfiles/output")
assert CompareRecursive(src_rp, restore_rp)
def testRestoreCorrupt(self):
"""Test restoring a partially corrupt archive
The problem here is that a directory is missing from what is
to be restored, but because the previous backup was aborted in
the middle, some of the files in that directory weren't marked
as .missing.
"""
Myrm("testfiles/output")
InternalRestore(1, 1, "testfiles/restoretest4", "testfiles/output",
10000)
assert os.lstat("testfiles/output")
self.assertRaises(OSError, os.lstat, "testfiles/output/tmp")
self.assertRaises(OSError, os.lstat, "testfiles/output/rdiff-backup")
if __name__ == "__main__": unittest.main()
|