From 8c37a5bdfdd46d5cfad6e9d67925ddef9ca382bf Mon Sep 17 00:00:00 2001 From: ben Date: Thu, 21 Mar 2002 07:22:43 +0000 Subject: First checkin git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup/trunk@2 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109 --- rdiff-backup/testing/chdir-wrapper | 15 + rdiff-backup/testing/commontest.py | 19 ++ rdiff-backup/testing/connectiontest.py | 201 +++++++++++ rdiff-backup/testing/destructive_steppingtest.py | 72 ++++ rdiff-backup/testing/filelisttest.py | 35 ++ rdiff-backup/testing/finaltest.py | 150 +++++++++ rdiff-backup/testing/find-max-ram.py | 60 ++++ rdiff-backup/testing/highleveltest.py | 75 +++++ rdiff-backup/testing/incrementtest.py | 100 ++++++ rdiff-backup/testing/iterfiletest.py | 27 ++ rdiff-backup/testing/lazytest.py | 326 ++++++++++++++++++ rdiff-backup/testing/rdifftest.py | 127 +++++++ rdiff-backup/testing/regressiontest.py | 410 +++++++++++++++++++++++ rdiff-backup/testing/restoretest.py | 47 +++ rdiff-backup/testing/rlisttest.py | 98 ++++++ rdiff-backup/testing/robusttest.py | 86 +++++ rdiff-backup/testing/roottest.py | 165 +++++++++ rdiff-backup/testing/rorpitertest.py | 105 ++++++ rdiff-backup/testing/rpathtest.py | 337 +++++++++++++++++++ rdiff-backup/testing/server.py | 12 + rdiff-backup/testing/setconnectionstest.py | 26 ++ rdiff-backup/testing/statictest.py | 63 ++++ rdiff-backup/testing/testall.py | 26 ++ rdiff-backup/testing/timetest.py | 71 ++++ 24 files changed, 2653 insertions(+) create mode 100755 rdiff-backup/testing/chdir-wrapper create mode 100644 rdiff-backup/testing/commontest.py create mode 100644 rdiff-backup/testing/connectiontest.py create mode 100644 rdiff-backup/testing/destructive_steppingtest.py create mode 100644 rdiff-backup/testing/filelisttest.py create mode 100644 rdiff-backup/testing/finaltest.py create mode 100755 rdiff-backup/testing/find-max-ram.py create mode 100644 rdiff-backup/testing/highleveltest.py create mode 100644 rdiff-backup/testing/incrementtest.py create mode 100644 rdiff-backup/testing/iterfiletest.py create mode 100644 rdiff-backup/testing/lazytest.py create mode 100644 rdiff-backup/testing/rdifftest.py create mode 100644 rdiff-backup/testing/regressiontest.py create mode 100644 rdiff-backup/testing/restoretest.py create mode 100644 rdiff-backup/testing/rlisttest.py create mode 100644 rdiff-backup/testing/robusttest.py create mode 100644 rdiff-backup/testing/roottest.py create mode 100644 rdiff-backup/testing/rorpitertest.py create mode 100644 rdiff-backup/testing/rpathtest.py create mode 100755 rdiff-backup/testing/server.py create mode 100644 rdiff-backup/testing/setconnectionstest.py create mode 100644 rdiff-backup/testing/statictest.py create mode 100644 rdiff-backup/testing/testall.py create mode 100644 rdiff-backup/testing/timetest.py (limited to 'rdiff-backup/testing') diff --git a/rdiff-backup/testing/chdir-wrapper b/rdiff-backup/testing/chdir-wrapper new file mode 100755 index 0000000..413fcf2 --- /dev/null +++ b/rdiff-backup/testing/chdir-wrapper @@ -0,0 +1,15 @@ +#!/usr/bin/env python + +"""Used to emulate a remote connection by changing directories. + +If given an argument, will change to that directory, and then start +the server. Otherwise will start the server without a chdir. + +""" + +execfile("commontest.py") +rbexec("setconnections.py") + +if len(sys.argv) > 1: os.chdir(sys.argv[1]) +PipeConnection(sys.stdin, sys.stdout).Server() + diff --git a/rdiff-backup/testing/commontest.py b/rdiff-backup/testing/commontest.py new file mode 100644 index 0000000..5cd66d7 --- /dev/null +++ b/rdiff-backup/testing/commontest.py @@ -0,0 +1,19 @@ +"""commontest - Some functions and constants common to all test cases""" +import os + +SourceDir = "../src" +AbsCurdir = os.getcwd() # Absolute path name of current directory +AbsTFdir = AbsCurdir+"/testfiles" +MiscDir = "../misc" + +def rbexec(src_file): + """Changes to the source directory, execfile src_file, return""" + os.chdir(SourceDir) + execfile(src_file, globals()) + os.chdir(AbsCurdir) + +def Make(): + """Make sure the rdiff-backup script in the source dir is up-to-date""" + os.chdir(SourceDir) + os.system("python ./Make") + os.chdir(AbsCurdir) diff --git a/rdiff-backup/testing/connectiontest.py b/rdiff-backup/testing/connectiontest.py new file mode 100644 index 0000000..bcd98ea --- /dev/null +++ b/rdiff-backup/testing/connectiontest.py @@ -0,0 +1,201 @@ +import unittest, types, tempfile, os, sys +execfile("commontest.py") +rbexec("setconnections.py") + + +class LocalConnectionTest(unittest.TestCase): + """Test the dummy connection""" + lc = Globals.local_connection + + def testGetAttrs(self): + """Test getting of various attributes""" + assert type(self.lc.LocalConnection) is types.ClassType + try: self.lc.asotnuhaoseu + except NameError: pass + else: unittest.fail("NameError should be raised") + + def testSetattrs(self): + """Test setting of global attributes""" + self.lc.x = 5 + assert self.lc.x == 5 + self.lc.x = 7 + assert self.lc.x == 7 + + def testDelattrs(self): + """Testing deletion of attributes""" + self.lc.x = 5 + del self.lc.x + try: self.lc.x + except NameError: pass + else: unittest.fail("No exception raised") + + def testReval(self): + """Test string evaluation""" + assert self.lc.reval("pow", 2, 3) == 8 + + +class LowLevelPipeConnectionTest(unittest.TestCase): + """Test LLPC class""" + objs = ["Hello", ("Tuple", "of", "strings"), + [1, 2, 3, 4], 53.34235] + excts = [TypeError("te"), NameError("ne"), os.error("oe")] + filename = tempfile.mktemp() + + def testObjects(self): + """Try moving objects across connection""" + outpipe = open(self.filename, "w") + LLPC = LowLevelPipeConnection(None, outpipe) + for obj in self.objs: LLPC._putobj(obj, 3) + outpipe.close() + inpipe = open(self.filename, "r") + LLPC.inpipe = inpipe + for obj in self.objs: + gotten = LLPC._get() + assert gotten == (3, obj), gotten + inpipe.close + os.unlink(self.filename) + + def testBuf(self): + """Try moving a buffer""" + outpipe = open(self.filename, "w") + LLPC = LowLevelPipeConnection(None, outpipe) + inbuf = open("testfiles/various_file_types/regular_file", "r").read() + LLPC._putbuf(inbuf, 234) + outpipe.close() + inpipe = open(self.filename, "r") + LLPC.inpipe = inpipe + assert (234, inbuf) == LLPC._get() + inpipe.close() + os.unlink(self.filename) + + def testSendingExceptions(self): + """Exceptions should also be sent down pipe well""" + outpipe = open(self.filename, "w") + LLPC = LowLevelPipeConnection(None, outpipe) + for exception in self.excts: LLPC._putobj(exception, 0) + outpipe.close() + inpipe = open(self.filename, "r") + LLPC.inpipe = inpipe + for exception in self.excts: + incoming_exception = LLPC._get() + assert isinstance(incoming_exception[1], exception.__class__) + inpipe.close() + os.unlink(self.filename) + + +class PipeConnectionTest(unittest.TestCase): + """Test Pipe connection""" + regfilename = "testfiles/various_file_types/regular_file" + + def setUp(self): + """Must start a server for this""" + stdin, stdout = os.popen2("./server.py") + self.conn = PipeConnection(stdout, stdin) + #self.conn.Log.setverbosity(9) + #Log.setverbosity(9) + + def testBasic(self): + """Test some basic pipe functions""" + assert self.conn.ord("a") == 97 + assert self.conn.pow(2,3) == 8 + assert self.conn.reval("ord", "a") == 97 + + def testModules(self): + """Test module emulation""" + assert type(self.conn.tempfile.mktemp()) is types.StringType + assert self.conn.os.path.join("a", "b") == "a/b" + rp1 = RPath(self.conn, self.regfilename) + assert rp1.isreg() + + def testVirtualFiles(self): + """Testing virtual files""" + tempout = self.conn.open("testfiles/tempout", "w") + assert isinstance(tempout, VirtualFile) + regfilefp = open(self.regfilename, "r") + RPath.copyfileobj(regfilefp, tempout) + tempout.close() + regfilefp.close() + tempoutlocal = open("testfiles/tempout", "r") + regfilefp = open(self.regfilename, "r") + assert RPath.cmpfileobj(regfilefp, tempoutlocal) + tempoutlocal.close() + regfilefp.close() + os.unlink("testfiles/tempout") + + assert RPath.cmpfileobj(self.conn.open(self.regfilename, "r"), + open(self.regfilename, "r")) + + def testString(self): + """Test transmitting strings""" + assert "32" == self.conn.str(32) + assert 32 == self.conn.int("32") + + def testIterators(self): + """Test transmission of iterators""" + i = iter(map(RORPsubstitute, range(10))) + assert self.conn.hasattr(i, "next") + datastring = self.conn.reval("lambda i: i.next().data", i) + assert datastring == "Hello, there 0", datastring + + def testRPaths(self): + """Test transmission of rpaths""" + rp = RPath(self.conn, "testfiles/various_file_types/regular_file") + assert self.conn.reval("lambda rp: rp.data", rp) == rp.data + assert self.conn.reval("lambda rp: rp.conn is Globals.local_connection", rp) + + def testExceptions(self): + """Test exceptional results""" + self.assertRaises(os.error, self.conn.os.lstat, + "asoeut haosetnuhaoseu tn") + self.assertRaises(SyntaxError, self.conn.reval, + "aoetnsu aoehtnsu") + assert self.conn.pow(2,3) == 8 + + def tearDown(self): + """Bring down connection""" + self.conn.quit() + + +class RedirectedConnectionTest(unittest.TestCase): + """Test routing and redirection""" + def setUp(self): + """Must start two servers for this""" + #Log.setverbosity(9) + self.conna = SetConnections.init_connection("./server.py") + self.connb = SetConnections.init_connection("./server.py") + + def testBasic(self): + """Test basic operations with redirection""" + self.conna.Globals.set("tmp_connb", self.connb) + self.connb.Globals.set("tmp_conna", self.conna) + assert self.conna.Globals.get("tmp_connb") is self.connb + assert self.connb.Globals.get("tmp_conna") is self.conna + + self.conna.Test_SetConnGlobals(self.connb, "tmp_settest", 1) + assert self.connb.Globals.get("tmp_settest") + + assert self.conna.reval("Globals.get('tmp_connb').pow", 2, 3) == 8 + self.conna.reval("Globals.tmp_connb.reval", + "Globals.tmp_conna.Globals.set", "tmp_marker", 5) + assert self.conna.Globals.get("tmp_marker") == 5 + + def testRpaths(self): + """Test moving rpaths back and forth across connections""" + rp = RPath(self.conna, "foo") + self.connb.Globals.set("tmp_rpath", rp) + rp_returned = self.connb.Globals.get("tmp_rpath") + assert rp_returned.conn is rp.conn + assert rp_returned.path == rp.path + + def tearDown(self): + SetConnections.CloseConnections() + +class RORPsubstitute: + """Used in testIterators above to simulate a RORP""" + def __init__(self, i): + self.index = i + self.data = "Hello, there %d" % i + self.file = None + +if __name__ == "__main__": + unittest.main() diff --git a/rdiff-backup/testing/destructive_steppingtest.py b/rdiff-backup/testing/destructive_steppingtest.py new file mode 100644 index 0000000..528561d --- /dev/null +++ b/rdiff-backup/testing/destructive_steppingtest.py @@ -0,0 +1,72 @@ +from __future__ import generators +import unittest +execfile("commontest.py") +rbexec("destructive_stepping.py") + + + +class DSTest(unittest.TestCase): + def setUp(self): + self.lc = Globals.local_connection + self.noperms = RPath(self.lc, "testfiles/noperms") + Globals.change_source_perms = 1 + self.iteration_dir = RPath(self.lc, "testfiles/iteration-test") + + def testDSIter(self): + """Testing destructive stepping iterator from baserp""" + for i in range(2): + ds_iter = DestructiveStepping.Iterate_with_Finalizer( + self.noperms, 1) + noperms = ds_iter.next() + assert noperms.isdir() and noperms.getperms() == 0 + + bar = ds_iter.next() + assert bar.isreg() and bar.getperms() == 0 + barbuf = bar.open("rb").read() + assert len(barbuf) > 0 + + foo = ds_iter.next() + assert foo.isreg() and foo.getperms() == 0 + assert foo.getmtime() < 1000300000 + + fuz = ds_iter.next() + assert fuz.isreg() and fuz.getperms() == 0200 + fuzbuf = fuz.open("rb").read() + assert len(fuzbuf) > 0 + + self.assertRaises(StopIteration, ds_iter.next) + + def testIterate_from(self): + """Tests basic iteration by Iterate_from""" + iter = DestructiveStepping.Iterate_from(self.iteration_dir, 1) + l = [] + for rp in iter: l.append(rp.index) + assert l == [(), + ('1',), + ('2',), + ('3',), ('3','2'), ('3','3'), + ('4',), + ('5',), ('5','1'), ('5','2'), ('5','2','1'), + ('6',), ('6','3'), + ('6','3','1'), ('6','3','2'), ('6','4'), + ('7',)], l + + def testIterate_from_index(self): + """Test iteration from a given index""" + iter = DestructiveStepping.Iterate_from(self.iteration_dir, 1, ('3',)) + l = [] + for rp in iter: l.append(rp.index) + assert l == [('3','2'), ('3','3'), + ('4',), + ('5',), ('5','1'), ('5','2'), ('5','2','1'), + ('6',), ('6','3'), + ('6','3','1'), ('6','3','2'), ('6','4'), + ('7',)], l + iter = DestructiveStepping.Iterate_from(self.iteration_dir, 1, + ('6','3')) + l = [] + for rp in iter: l.append(rp.index) + assert l == [('6','3','1'), ('6','3','2'), ('6', '4'), + ('7',)], l + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/filelisttest.py b/rdiff-backup/testing/filelisttest.py new file mode 100644 index 0000000..f6166ed --- /dev/null +++ b/rdiff-backup/testing/filelisttest.py @@ -0,0 +1,35 @@ +import unittest, StringIO +execfile("commontest.py") +rbexec("filelist.py") + + +class FilelistTest(unittest.TestCase): + """Test Filelist class""" + def testFile2Iter(self): + """Test File2Iter function""" + filelist = """ +hello +goodbye +a/b/c + +test""" + baserp = RPath(Globals.local_connection, "/base") + i = Filelist.File2Iter(StringIO.StringIO(filelist), baserp) + assert i.next().path == "/base/hello" + assert i.next().path == "/base/goodbye" + assert i.next().path == "/base/a/b/c" + assert i.next().path == "/base/test" + self.assertRaises(StopIteration, i.next) + + def testmake_subdirs(self): + """Test Filelist.make_subdirs""" + self.assertRaises(os.error, os.lstat, "foo_delete_me") + Filelist.make_subdirs(RPath(Globals.local_connection, + "foo_delete_me/a/b/c/d")) + os.lstat("foo_delete_me") + os.lstat("foo_delete_me/a") + os.lstat("foo_delete_me/a/b") + os.lstat("foo_delete_me/a/b/c") + os.system("rm -rf foo_delete_me") + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/finaltest.py b/rdiff-backup/testing/finaltest.py new file mode 100644 index 0000000..c92a7d1 --- /dev/null +++ b/rdiff-backup/testing/finaltest.py @@ -0,0 +1,150 @@ +import unittest, os, re, sys +execfile("commontest.py") +rbexec("restore.py") + +"""Regression tests""" + +Globals.exclude_mirror_regexps = [re.compile(".*/rdiff-backup-data")] +Log.setverbosity(7) +Make() + +lc = Globals.local_connection + +class Local: + """This is just a place to put increments relative to the local + connection""" + def get_local_rp(extension): + return RPath(Globals.local_connection, "testfiles/" + extension) + + inc1rp = get_local_rp('increment1') + inc2rp = get_local_rp('increment2') + inc3rp = get_local_rp('increment3') + inc4rp = get_local_rp('increment4') + + rpout = get_local_rp('output') + rpout_inc = get_local_rp('output_inc') + rpout1 = get_local_rp('restoretarget1') + rpout2 = get_local_rp('restoretarget2') + rpout3 = get_local_rp('restoretarget3') + rpout4 = get_local_rp('restoretarget4') + + prefix = get_local_rp('.') + + vft_in = get_local_rp('vft_out') + vft_out = get_local_rp('increment2/various_file_types') + + timbar_in = get_local_rp('increment1/timbar.pyc') + timbar_out = get_local_rp('../timbar.pyc') # in cur directory + +class PathSetter(unittest.TestCase): + def setUp(self): + self.rb_schema = SourceDir + \ + "/rdiff-backup -v5 --remote-schema './chdir-wrapper %s' " + + def refresh(self, *rp_list): + """Reread data for the given rps""" + for rp in rp_list: rp.setdata() + + def set_connections(self, src_pre, src_back, dest_pre, dest_back): + """Set source and destination prefixes""" + if src_pre: self.src_prefix = "%s::%s" % (src_pre, src_back) + else: self.src_prefix = './' + + if dest_pre: self.dest_prefix = "%s::%s" % (dest_pre, dest_back) + else: self.dest_prefix = './' + + def exec_rb(self, *args): + """Run rdiff-backup on given arguments""" + arglist = [] + arglist.append(self.src_prefix + args[0]) + if len(args) > 1: + arglist.append(self.dest_prefix + args[1]) + assert len(args) == 2 + + cmdstr = self.rb_schema + ' '.join(arglist) + print "executing " + cmdstr + assert not os.system(cmdstr) + + def runtest(self): + # Deleting previous output + assert not os.system(MiscDir + '/myrm testfiles/output* ' + 'testfiles/restoretarget* testfiles/vft_out ' + 'timbar.pyc') + + # Backing up increment1 + self.exec_rb('testfiles/increment1', 'testfiles/output') + assert RPathStatic.cmp_recursive(Local.inc1rp, Local.rpout) + time.sleep(1) + + # Backing up increment2 + self.exec_rb('testfiles/increment2', 'testfiles/output') + assert RPathStatic.cmp_recursive(Local.inc2rp, Local.rpout) + time.sleep(1) + + # Backing up increment3 + self.exec_rb('testfiles/increment3', 'testfiles/output') + assert RPathStatic.cmp_recursive(Local.inc3rp, Local.rpout) + time.sleep(1) + + # Backing up increment4 + self.exec_rb('testfiles/increment4', 'testfiles/output') + assert RPathStatic.cmp_recursive(Local.inc4rp, Local.rpout) + + # Getting restore rps + inc_paths = self.getinc_paths("increments.", + "testfiles/output/rdiff-backup-data") + assert len(inc_paths) == 3 + + # Restoring increment1 + self.exec_rb(inc_paths[0], 'testfiles/restoretarget1') + assert RPathStatic.cmp_recursive(Local.inc1rp, Local.rpout1) + + # Restoring increment2 + self.exec_rb(inc_paths[1], 'testfiles/restoretarget2') + assert RPathStatic.cmp_recursive(Local.inc2rp, Local.rpout2) + + # Restoring increment3 + self.exec_rb(inc_paths[2], 'testfiles/restoretarget3') + assert RPathStatic.cmp_recursive(Local.inc3rp, Local.rpout3) + + # Test restoration of a few random files + vft_paths = self.getinc_paths("various_file_types.", + "testfiles/output/rdiff-backup-data/increments") + self.exec_rb(vft_paths[1], 'testfiles/vft_out') + self.refresh(Local.vft_in, Local.vft_out) + assert RPathStatic.cmp_recursive(Local.vft_in, Local.vft_out) + + timbar_paths = self.getinc_paths("timbar.pyc.", + "testfiles/output/rdiff-backup-data/increments") + self.exec_rb(timbar_paths[0]) + self.refresh(Local.timbar_in, Local.timbar_out) + assert RPath.cmp_with_attribs(Local.timbar_in, Local.timbar_out) + + # Make sure too many increment files not created + assert len(self.getinc_paths("nochange.", + "testfiles/output/rdiff-backup-data/increments")) == 0 + assert len(self.getinc_paths("", + "testfiles/output/rdiff-backup-data/increments/nochange")) == 0 + + def getinc_paths(self, basename, directory): + """Return increment.______.dir paths""" + incfiles = filter(lambda s: s.startswith(basename), + os.listdir(directory)) + incfiles.sort() + incrps = map(lambda f: RPath(lc, directory+"/"+f), incfiles) + return map(lambda x: x.path, filter(RPath.isincfile, incrps)) + + +class Final(PathSetter): + def testLocal(self): + """Run test sequence everything local""" + self.set_connections(None, None, None, None) + self.runtest() + + def testRemoteAll(self): + """Run test sequence everything remote""" + self.set_connections("test1/", '../', 'test2/tmp/', '../../') + self.runtest() + + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/find-max-ram.py b/rdiff-backup/testing/find-max-ram.py new file mode 100755 index 0000000..41685f1 --- /dev/null +++ b/rdiff-backup/testing/find-max-ram.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python + +"""find-max-ram - Returns the maximum amount of memory used by a program. + +Every half second, run ps with the appropriate commands, getting the +size of the program. Return max value. + +""" + +import os, sys, time + +def get_val(cmdstr): + """Runs ps and gets sum rss for processes making cmdstr + + Returns None if process not found. + + """ + cmd = ("ps -Ao cmd -o rss | grep '%s' | grep -v grep" % cmdstr) +# print "Running ", cmd + fp = os.popen(cmd) + lines = fp.readlines() + fp.close() + + if not lines: return None + else: return reduce(lambda x,y: x+y, map(read_ps_line, lines)) + +def read_ps_line(psline): + """Given a specially formatted line by ps, return rss value""" + l = psline.split() + assert len(l) >= 2 # first few are name, last one is rss + return int(l[-1]) + + +def main(cmdstr): + while get_val(cmdstr) is None: time.sleep(0.5) + + current_max = 0 + while 1: + rss = get_val(cmdstr) + print rss + if rss is None: break + current_max = max(current_max, rss) + time.sleep(0.5) + + print current_max + + +if __name__=="__main__": + + if len(sys.argv) != 2: + print """Usage: find-max-ram [command string] + + It will then run ps twice a second and keep totalling how much RSS + (resident set size) the process(es) whose ps command name contain the + given string use up. When there are no more processes found, it will + print the number and exit. + """ + sys.exit(1) + else: main(sys.argv[1]) + diff --git a/rdiff-backup/testing/highleveltest.py b/rdiff-backup/testing/highleveltest.py new file mode 100644 index 0000000..b1e6f8d --- /dev/null +++ b/rdiff-backup/testing/highleveltest.py @@ -0,0 +1,75 @@ +import unittest + +execfile("commontest.py") +rbexec("setconnections.py") + + +class RemoteMirrorTest(unittest.TestCase): + """Test mirroring""" + def setUp(self): + """Start server""" + Log.setverbosity(7) + Globals.change_source_perms = 1 + self.conn = SetConnections.init_connection("./server.py") + + self.inrp = RPath(Globals.local_connection, "testfiles/various_file_types") + self.outrp = RPath(self.conn, "testfiles/output") + self.rbdir = RPath(self.conn, "testfiles/output/rdiff-backup-data") + SetConnections.UpdateGlobal('rbdir', self.rbdir) + self.inc1 = RPath(Globals.local_connection, "testfiles/increment1") + self.inc2 = RPath(Globals.local_connection, "testfiles/increment2") + self.inc3 = RPath(Globals.local_connection, "testfiles/increment3") + self.inc4 = RPath(Globals.local_connection, "testfiles/increment4") + + SetConnections.BackupInitConnections(Globals.local_connection, + self.conn) + SetConnections.UpdateGlobal('checkpoint_interval', 3) + + def testMirror(self): + """Testing simple mirror""" + if self.outrp.lstat(): self.outrp.delete() + HighLevel.Mirror(self.inrp, self.outrp, None) + self.outrp.setdata() + assert RPath.cmp_recursive(self.inrp, self.outrp) + + def testMirror2(self): + """Test mirror with larger data set""" + if self.outrp.lstat(): self.outrp.delete() + for rp in [self.inc1, self.inc2, self.inc3, self.inc4]: + rp.setdata() + print "----------------- Starting ", rp.path + HighLevel.Mirror(rp, self.outrp, None) + #if rp is self.inc2: assert 0 + assert RPath.cmp_recursive(rp, self.outrp) + self.outrp.setdata() + + def testMirrorWithCheckpointing(self): + """Like testMirror but this time checkpoint""" + if self.outrp.lstat(): self.outrp.delete() + self.outrp.mkdir() + self.rbdir.mkdir() + Globals.add_regexp("testfiles/output/rdiff-backup-data", 1) + Time.setcurtime() + SaveState.init_filenames(None) + HighLevel.Mirror(self.inrp, self.outrp, 1) + self.outrp.setdata() + assert RPath.cmp_recursive(self.inrp, self.outrp) + + def testMirrorWithCheckpointing2(self): + """Larger data set""" + if self.outrp.lstat(): os.system(MiscDir+"/myrm %s" % self.outrp.path) + self.outrp.setdata() + self.outrp.mkdir() + self.rbdir.mkdir() + Globals.add_regexp("testfiles/output/rdiff-backup-data", 1) + Time.setcurtime() + SaveState.init_filenames(None) + for rp in [self.inc1, self.inc2, self.inc3, self.inc4]: + print "----------------- Starting ", rp.path + HighLevel.Mirror(rp, self.outrp, 1) + assert RPath.cmp_recursive(rp, self.outrp) + + def tearDown(self): SetConnections.CloseConnections() + + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/incrementtest.py b/rdiff-backup/testing/incrementtest.py new file mode 100644 index 0000000..d6cd17f --- /dev/null +++ b/rdiff-backup/testing/incrementtest.py @@ -0,0 +1,100 @@ +import unittest, os + +execfile("commontest.py") +rbexec("increment.py") + + +lc = Globals.local_connection +Globals.change_source_perms = 1 +Log.setverbosity(7) + +def getrp(ending): + return RPath(lc, "testfiles/various_file_types/" + ending) + +rf = getrp("regular_file") +exec1 = getrp("executable") +exec2 = getrp("executable2") +sig = getrp("regular_file.sig") +hl1, hl2 = map(getrp, ["two_hardlinked_files1", "two_hardlinked_files2"]) +test = getrp("test") +dir = getrp(".") +sym = getrp("symbolic_link") +nothing = getrp("nothing") + +target = RPath(lc, "testfiles/out") + +Time.setprevtime(999424113.24931) +prevtimestr = "2001-09-02T02:48:33-07:00" +t_pref = "testfiles/out.2001-09-02T02:48:33-07:00" +t_diff = "testfiles/out.2001-09-02T02:48:33-07:00.diff" + +class inctest(unittest.TestCase): + """Test the incrementRP function""" + def setUp(self): + pass + + def testreg(self): + """Test increment of regular files""" + Inc.Increment(rf, exec1, target) + rpd = RPath(lc, t_diff) + assert rpd.isreg() + assert RPath.cmp_attribs(rpd, exec1) + rpd.delete() + + def testmissing(self): + """Test creation of missing files""" + Inc.Increment(rf, nothing, target) + rp = RPath(lc, t_pref + ".missing") + assert rp.lstat() + rp.delete() + + def testsnapshot(self): + """Test making of a snapshot""" + Inc.Increment(rf, sym, target) + rp = RPath(lc, t_pref + ".snapshot") + assert rp.lstat() + assert RPath.cmp_attribs(rp, sym) + assert RPath.cmp(rp, sym) + rp.delete() + + Inc.Increment(sym, rf, target) + rp = RPath(lc, t_pref + ".snapshot") + assert rp.lstat() + assert RPath.cmp_attribs(rp, rf) + assert RPath.cmp(rp, rf) + rp.delete() + + def testdir(self): + """Test increment on dir""" + Inc.Increment(sym, dir, target) + rp = RPath(lc, t_pref + ".dir") + rp2 = RPath(lc, t_pref) + assert rp.lstat() + assert target.isdir() + assert RPath.cmp_attribs(dir, rp) + assert rp.isreg() + rp.delete() + target.delete() + + +inc1rp = RPath(lc, "testfiles/increment1") +inc2rp = RPath(lc, "testfiles/increment2") +inc3rp = RPath(lc, "testfiles/increment3") +inc4rp = RPath(lc, "testfiles/increment4") +rpout = RPath(lc, "testfiles/output") + +#class IncTreeTest(unittest.TestCase): +# def setUp(self): +# os.system("./myrm testfiles/output*") + +# def testinctree(self): +# """Test tree incrementing""" +# rpt1 = RPTriple(inc2rp, inc1rp, rpout) +# rpt2 = RPTriple(inc3rp, inc2rp, rpout) +# rpt3 = RPTriple(inc4rp, inc3rp, rpout) +# for rpt in [rpt1, rpt2, rpt3]: +# Time.setprevtime(Time.prevtime + 10000) +# Inc.IncrementTTree(TripleTree(rpt).destructive_stepping()) +# Time.setprevtime(999424113.24931) + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/iterfiletest.py b/rdiff-backup/testing/iterfiletest.py new file mode 100644 index 0000000..38dca4d --- /dev/null +++ b/rdiff-backup/testing/iterfiletest.py @@ -0,0 +1,27 @@ +import unittest, StringIO +execfile("commontest.py") +rbexec("iterfile.py") + + +class testIterFile(unittest.TestCase): + def setUp(self): + self.iter1maker = lambda: iter(range(50)) + self.iter2maker = lambda: iter(map(str, range(50))) + + def testConversion(self): + """Test iter to file conversion""" + for itm in [self.iter1maker, self.iter2maker]: + assert Iter.equal(itm(), + IterWrappingFile(FileWrappingIter(itm()))) + +class testBufferedRead(unittest.TestCase): + def testBuffering(self): + """Test buffering a StringIO""" + fp = StringIO.StringIO("12345678"*10000) + bfp = BufferedRead(fp) + assert bfp.read(5) == "12345" + assert bfp.read(4) == "6781" + assert len(bfp.read(75000)) == 75000 + + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/lazytest.py b/rdiff-backup/testing/lazytest.py new file mode 100644 index 0000000..fb464a1 --- /dev/null +++ b/rdiff-backup/testing/lazytest.py @@ -0,0 +1,326 @@ +from __future__ import generators +import unittest + +execfile("commontest.py") +rbexec("lazy.py") + +class Iterators(unittest.TestCase): + one_to_100 = lambda s: iter(range(1, 101)) + evens = lambda s: iter(range(2, 101, 2)) + odds = lambda s: iter(range(1, 100, 2)) + empty = lambda s: iter([]) + + def __init__(self, *args): + apply (unittest.TestCase.__init__, (self,) + args) + self.falseerror = self.falseerror_maker() + self.trueerror = self.trueerror_maker() + self.emptygen = self.emptygen_maker() + self.typeerror = self.typeerror_maker() + self.nameerror = self.nameerror_maker() + + def falseerror_maker(self): + yield None + yield 0 + yield [] + raise Exception + + def trueerror_maker(self): + yield 1 + yield "hello" + yield (2, 3) + raise Exception + + def nameerror_maker(self): + if 0: yield 1 + raise NameError + + def typeerror_maker(self): + yield 1 + yield 2 + raise TypeError + + def alwayserror(self, x): + raise Exception + + def emptygen_maker(self): + if 0: yield 1 + + +class IterEqualTestCase(Iterators): + """Tests for iter_equal function""" + def testEmpty(self): + """Empty iterators should be equal""" + assert Iter.equal(self.empty(), iter([])) + + def testNormal(self): + """See if normal iterators are equal""" + assert Iter.equal(iter((1,2,3)), iter((1,2,3))) + assert Iter.equal(self.odds(), iter(range(1, 100, 2))) + assert Iter.equal(iter((1,2,3)), iter(range(1, 4))) + + def testNormalInequality(self): + """See if normal unequals work""" + assert not Iter.equal(iter((1,2,3)), iter((1,2,4))) + assert not Iter.equal(self.odds(), iter(["hello", "there"])) + + def testGenerators(self): + """equals works for generators""" + def f(): + yield 1 + yield "hello" + def g(): + yield 1 + yield "hello" + assert Iter.equal(f(), g()) + + def testLength(self): + """Differently sized iterators""" + assert not Iter.equal(iter((1,2,3)), iter((1,2))) + assert not Iter.equal(iter((1,2)), iter((1,2,3))) + + +class FilterTestCase(Iterators): + """Tests for lazy_filter function""" + def testEmpty(self): + """empty iterators -> empty iterators""" + assert Iter.empty(Iter.filter(self.alwayserror, + self.empty())), \ + "Filtering an empty iterator should result in empty iterator" + + def testNum1(self): + """Test numbers 1 - 100 #1""" + assert Iter.equal(Iter.filter(lambda x: x % 2 == 0, + self.one_to_100()), + self.evens()) + assert Iter.equal(Iter.filter(lambda x: x % 2, + self.one_to_100()), + self.odds()) + + def testError(self): + """Should raise appropriate error""" + i = Iter.filter(lambda x: x, self.falseerror_maker()) + self.assertRaises(Exception, i.next) + + +class MapTestCase(Iterators): + """Test mapping of iterators""" + def testNumbers(self): + """1 to 100 * 2 = 2 to 200""" + assert Iter.equal(Iter.map(lambda x: 2*x, self.one_to_100()), + iter(range(2, 201, 2))) + + def testShortcut(self): + """Map should go in order""" + def f(x): + if x == "hello": + raise NameError + i = Iter.map(f, self.trueerror_maker()) + i.next() + self.assertRaises(NameError, i.next) + + def testEmpty(self): + """Map of an empty iterator is empty""" + assert Iter.empty(Iter.map(lambda x: x, iter([]))) + + +class CatTestCase(Iterators): + """Test concatenation of iterators""" + def testEmpty(self): + """Empty + empty = empty""" + assert Iter.empty(Iter.cat(iter([]), iter([]))) + + def testNumbers(self): + """1 to 50 + 51 to 100 = 1 to 100""" + assert Iter.equal(Iter.cat(iter(range(1, 51)), iter(range(51, 101))), + self.one_to_100()) + + def testShortcut(self): + """Process iterators in order""" + i = Iter.cat(self.typeerror_maker(), self.nameerror_maker()) + i.next() + i.next() + self.assertRaises(TypeError, i.next) + + +class AndOrTestCase(Iterators): + """Test And and Or""" + def testEmpty(self): + """And() -> true, Or() -> false""" + assert Iter.And(self.empty()) + assert not Iter.Or(self.empty()) + + def testAndShortcut(self): + """And should return if any false""" + assert Iter.And(self.falseerror_maker()) is None + + def testOrShortcut(self): + """Or should return if any true""" + assert Iter.Or(self.trueerror_maker()) == 1 + + def testNormalAnd(self): + """And should go through true iterators, picking last""" + assert Iter.And(iter([1,2,3,4])) == 4 + self.assertRaises(Exception, Iter.And, self.trueerror_maker()) + + def testNormalOr(self): + """Or goes through false iterators, picking last""" + assert Iter.Or(iter([0, None, []])) == [] + self.assertRaises(Exception, Iter.Or, self.falseerror_maker()) + + +class FoldingTest(Iterators): + """Test folding operations""" + def f(self, x, y): return x + y + + def testEmpty(self): + """Folds of empty iterators should produce defaults""" + assert Iter.foldl(self.f, 23, self.empty()) == 23 + assert Iter.foldr(self.f, 32, self.empty()) == 32 + + def testAddition(self): + """Use folds to sum lists""" + assert Iter.foldl(self.f, 0, self.one_to_100()) == 5050 + assert Iter.foldr(self.f, 0, self.one_to_100()) == 5050 + + def testLargeAddition(self): + """Folds on 10000 element iterators""" + assert Iter.foldl(self.f, 0, iter(range(1, 10001))) == 50005000 + self.assertRaises(RuntimeError, + Iter.foldr, self.f, 0, iter(range(1, 10001))) + + def testLen(self): + """Use folds to calculate length of lists""" + assert Iter.foldl(lambda x, y: x+1, 0, self.evens()) == 50 + assert Iter.foldr(lambda x, y: y+1, 0, self.odds()) == 50 + +class MultiplexTest(Iterators): + def testSingle(self): + """Test multiplex single stream""" + i_orig = self.one_to_100() + i2_orig = self.one_to_100() + i = Iter.multiplex(i_orig, 1)[0] + assert Iter.equal(i, i2_orig) + + def testTrible(self): + """Test splitting iterator into three""" + counter = [0] + def ff(x): counter[0] += 1 + i_orig = self.one_to_100() + i2_orig = self.one_to_100() + i1, i2, i3 = Iter.multiplex(i_orig, 3, ff) + assert Iter.equal(i1, i2) + assert Iter.equal(i3, i2_orig) + assert counter[0] == 100, counter + + def testDouble(self): + """Test splitting into two...""" + i1, i2 = Iter.multiplex(self.one_to_100(), 2) + assert Iter.equal(i1, self.one_to_100()) + assert Iter.equal(i2, self.one_to_100()) + + +class index: + """This is just used below to test the iter tree reducer""" + def __init__(self, index): + self.index = index + + +class TreeReducerTest(unittest.TestCase): + def setUp(self): + self.i1 = iter(map(index, [(), (1,), (2,), (3,)])) + self.i2 = iter(map(index, [(0,), (0,1), (0,1,0), (0,1,1), + (0,2), (0,2,1), (0,3)])) + + self.i1a = iter(map(index, [(), (1,)])) + self.i1b = iter(map(index, [(2,), (3,)])) + self.i2a = iter(map(index, [(0,), (0,1), (0,1,0)])) + self.i2b = iter(map(index, [(0,1,1), (0,2)])) + self.i2c = iter(map(index, [(0,2,1), (0,3)])) + + # The four following are used to make an ITR later + def number_of_index(self, index_obj): + if not index_obj.index: return 0 + else: return index_obj.index[-1] + + def sum_index(self, index_obj): + return reduce(lambda x,y: x+y, index_obj.index, 0) + + def add2(self, x, y): + #print "adding %d and %d" % (x,y) + return x+y + + def add3(self, x,y,z): + #print "ignoring %s, adding %d and %d" % (x,y,z) + return y+z + + def testTreeReducer(self): + """testing IterTreeReducer""" + itm = IterTreeReducer(self.number_of_index, self.add2, 0, self.add3) + for elem in self.i1: + val = itm(elem) + assert val, elem.index + itm.calculate_final_val() + assert itm.getresult() == 6, itm.getresult() + + itm2 = IterTreeReducer(self.sum_index, self.add2, 0, self.add3) + for elem in self.i2: + val = itm2(elem) + if elem.index == (): assert not val + else: assert val + assert itm2.getresult() == 12, itm2.getresult() + + def testTreeReducerState(self): + """Test saving and recreation of an IterTreeReducer""" + itm1a = IterTreeReducer(self.number_of_index, self.add2, 0, self.add3) + for elem in self.i1a: + val = itm1a(elem) + assert val, elem.index + itm1b = IterTreeReducer(self.number_of_index, self.add2, 0, self.add3, + itm1a.getstate()) + for elem in self.i1b: + val = itm1b(elem) + assert val, elem.index + itm1b.calculate_final_val() + assert itm1b.getresult() == 6, itm1b.getresult() + + itm2a = IterTreeReducer(self.sum_index, self.add2, 0, self.add3) + for elem in self.i2a: + val = itm2a(elem) + if elem.index == (): assert not val + else: assert val + itm2b = IterTreeReducer(self.sum_index, self.add2, 0, self.add3, + itm2a.getstate()) + for elem in self.i2b: + val = itm2b(elem) + if elem.index == (): assert not val + else: assert val + itm2c = IterTreeReducer(self.sum_index, self.add2, 0, self.add3, + itm2b.getstate()) + for elem in self.i2c: + val = itm2c(elem) + if elem.index == (): assert not val + else: assert val + assert itm2c.getresult() == 12, itm2c.getresult() + + def testTreeReducer2(self): + """Another test of the tree reducer""" + assert Iter.len(self.i1) == 4 + + hit_021_02 = [None, None] + def helper(indexobj, elem_init, branch_result): + if indexobj.index == (0,2): + assert hit_021_02[0] + hit_021_02[1] = 1 + elif indexobj.index == (0,2,1): + assert not hit_021_02[1] + hit_021_02[0] = 1 + return None + itm = IterTreeReducer(lambda x: None, lambda x,y: None, None, helper) + + for elem in self.i2: itm(elem) + itm.getresult() + assert hit_021_02 == [1,1] + + + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/rdifftest.py b/rdiff-backup/testing/rdifftest.py new file mode 100644 index 0000000..471eab7 --- /dev/null +++ b/rdiff-backup/testing/rdifftest.py @@ -0,0 +1,127 @@ +import unittest, random + +execfile("commontest.py") +rbexec("destructive_stepping.py") + + +Log.setverbosity(6) + +def MakeRandomFile(path): + """Writes a random file of length between 10000 and 100000""" + fp = open(path, "w") + randseq = [] + for i in xrange(random.randrange(5000, 30000)): + randseq.append(chr(random.randrange(256))) + fp.write("".join(randseq)) + fp.close() + + +class RdiffTest(unittest.TestCase): + """Test rdiff""" + lc = Globals.local_connection + basis = RPath(lc, "testfiles/basis") + new = RPath(lc, "testfiles/new") + output = RPath(lc, "testfiles/output") + delta = RPath(lc, "testfiles/delta") + signature = RPath(lc, "testfiles/signature") + + def testRdiffSig(self): + """Test making rdiff signatures""" + sig = RPath(self.lc, "testfiles/various_file_types/regular_file.sig") + sigfp = sig.open("r") + rfsig = Rdiff.get_signature(RPath(self.lc, "testfiles/various_file_types/regular_file")) + assert RPath.cmpfileobj(sigfp, rfsig) + sigfp.close() + rfsig.close() + + def testRdiffDeltaPatch(self): + """Test making deltas and patching files""" + rplist = [self.basis, self.new, self.delta, + self.signature, self.output] + for rp in rplist: + if rp.lstat(): rp.delete() + + for i in range(2): + MakeRandomFile(self.basis.path) + MakeRandomFile(self.new.path) + map(RPath.setdata, [self.basis, self.new]) + assert self.basis.lstat() and self.new.lstat() + self.signature.write_from_fileobj(Rdiff.get_signature(self.basis)) + assert self.signature.lstat() + self.delta.write_from_fileobj(Rdiff.get_delta(self.signature, + self.new)) + assert self.delta.lstat() + Rdiff.patch_action(self.basis, self.delta, self.output).execute() + assert RPath.cmp(self.new, self.output) + map(RPath.delete, rplist) + + def testWriteDelta(self): + """Test write delta feature of rdiff""" + rplist = [self.basis, self.new, self.delta, self.output] + MakeRandomFile(self.basis.path) + MakeRandomFile(self.new.path) + map(RPath.setdata, [self.basis, self.new]) + assert self.basis.lstat() and self.new.lstat() + + Rdiff.write_delta(self.basis, self.new, self.delta) + assert self.delta.lstat() + Rdiff.patch_action(self.basis, self.delta, self.output).execute() + assert RPath.cmp(self.new, self.output) + map(RPath.delete, rplist) + + def testRdiffRename(self): + """Rdiff replacing original file with patch outfile""" + rplist = [self.basis, self.new, self.delta, self.signature] + for rp in rplist: + if rp.lstat(): rp.delete() + + MakeRandomFile(self.basis.path) + MakeRandomFile(self.new.path) + map(RPath.setdata, [self.basis, self.new]) + assert self.basis.lstat() and self.new.lstat() + self.signature.write_from_fileobj(Rdiff.get_signature(self.basis)) + assert self.signature.lstat() + self.delta.write_from_fileobj(Rdiff.get_delta(self.signature, + self.new)) + assert self.delta.lstat() + Rdiff.patch_action(self.basis, self.delta).execute() + assert RPath.cmp(self.basis, self.new) + map(RPath.delete, rplist) + + def testCopy(self): + """Using rdiff to copy two files""" + rplist = [self.basis, self.new] + for rp in rplist: + if rp.lstat(): rp.delete() + + MakeRandomFile(self.basis.path) + MakeRandomFile(self.new.path) + map(RPath.setdata, rplist) + Rdiff.copy_action(self.basis, self.new).execute() + assert RPath.cmp(self.basis, self.new) + map(RPath.delete, rplist) + + def testPatchWithAttribs(self): + """Using rdiff to copy two files with attributes""" + rplist = [self.basis, self.new, self.delta] + for rp in rplist: + if rp.lstat(): rp.delete() + + MakeRandomFile(self.basis.path) + MakeRandomFile(self.new.path) + self.new.chmod(0401) + map(RPath.setdata, rplist) + Rdiff.write_delta(self.basis, self.new, self.delta) + RPath.copy_attribs(self.new, self.delta) + assert self.delta.getperms() == 0401 + + assert not self.basis == self.new + Rdiff.patch_with_attribs_action(self.basis, self.delta).execute() + if not self.basis == self.new: + print self.basis, self.new + assert 0 + map(RPath.delete, rplist) + + +if __name__ == '__main__': + unittest.main() diff --git a/rdiff-backup/testing/regressiontest.py b/rdiff-backup/testing/regressiontest.py new file mode 100644 index 0000000..5d4d27e --- /dev/null +++ b/rdiff-backup/testing/regressiontest.py @@ -0,0 +1,410 @@ +import unittest, os + +execfile("commontest.py") +rbexec("setconnections.py") + + +"""Regression tests + +This one must be run in the rdiff-backup directory, as it requres +chdir-wrapper, the various rdiff-backup files, and the directory +testfiles +""" + +Globals.set('change_source_perms', 1) +Globals.counter = 0 +Log.setverbosity(7) + +class Local: + """This is just a place to put increments relative to the local + connection""" + def get_local_rp(extension): + return RPath(Globals.local_connection, "testfiles/" + extension) + + inc1rp = get_local_rp('increment1') + inc2rp = get_local_rp('increment2') + inc3rp = get_local_rp('increment3') + inc4rp = get_local_rp('increment4') + + rpout = get_local_rp('output') + rpout_inc = get_local_rp('output_inc') + rpout1 = get_local_rp('restoretarget1') + rpout2 = get_local_rp('restoretarget2') + rpout3 = get_local_rp('restoretarget3') + rpout4 = get_local_rp('restoretarget4') + + noperms = get_local_rp('noperms') + noperms_out = get_local_rp('noperms_output') + + rootfiles = get_local_rp('root') + rootfiles2 = get_local_rp('root2') + rootfiles21 = get_local_rp('root2.1') + rootfiles_out = get_local_rp('root_output') + rootfiles_out2 = get_local_rp('root_output2') + + prefix = get_local_rp('.') + + +class PathSetter(unittest.TestCase): + def get_prefix_and_conn(self, path, return_path): + """Return (prefix, connection) tuple""" + if path: + return (return_path, + SetConnections.init_connection("python ./chdir-wrapper "+path)) + else: return ('./', Globals.local_connection) + + def get_src_rp(self, path): + return RPath(self.src_conn, self.src_prefix + path) + + def get_dest_rp(self, path): + return RPath(self.dest_conn, self.dest_prefix + path) + + def set_rbdir(self, rpout): + """Create rdiff-backup-data dir if not already, tell everyone""" + self.rbdir = self.rpout.append('rdiff-backup-data') + self.rpout.mkdir() + self.rbdir.mkdir() + SetConnections.UpdateGlobal('rbdir', self.rbdir) + + # Better safe than sorry - cover all possibilities + Globals.add_regexp("testfiles/output/rdiff-backup-data", 1) + Globals.add_regexp("./testfiles/output/rdiff-backup-data", 1) + Globals.add_regexp("../testfiles/output/rdiff-backup-data", 1) + Globals.add_regexp("../../testfiles/output/rdiff-backup-data", 1) + + def setPathnames(self, src_path, src_return, dest_path, dest_return): + """Start servers which will run in src_path and dest_path respectively + + If either is None, then no server will be run and local + process will handle that end. src_return and dest_return are + the prefix back to the original rdiff-backup directory. So + for instance is src_path is "test2/tmp", then src_return will + be '../'. + + """ + # Clear old data that may rely on deleted connections + Globals.isbackup_writer = None + Globals.isbackup_reader = None + Globals.rbdir = None + + print "Setting up connection" + self.src_prefix, self.src_conn = \ + self.get_prefix_and_conn(src_path, src_return) + self.dest_prefix, self.dest_conn = \ + self.get_prefix_and_conn(dest_path, dest_return) + SetConnections.BackupInitConnections(self.src_conn, self.dest_conn) + + os.system(MiscDir+"/myrm testfiles/output* testfiles/restoretarget* " + "testfiles/noperms_output testfiles/root_output " + "testfiles/unreadable_out") + self.inc1rp = self.get_src_rp("testfiles/increment1") + self.inc2rp = self.get_src_rp('testfiles/increment2') + self.inc3rp = self.get_src_rp('testfiles/increment3') + self.inc4rp = self.get_src_rp('testfiles/increment4') + + self.rpout_inc = self.get_dest_rp('testfiles/output_inc') + self.rpout1 = self.get_dest_rp('testfiles/restoretarget1') + self.rpout2 = self.get_dest_rp('testfiles/restoretarget2') + self.rpout3 = self.get_dest_rp('testfiles/restoretarget3') + self.rpout4 = self.get_dest_rp('testfiles/restoretarget4') + + self.rpout = self.get_dest_rp('testfiles/output') + self.set_rbdir(self.rpout) + + self.noperms = self.get_src_rp('testfiles/noperms') + self.noperms_out = self.get_dest_rp('testfiles/noperms_output') + + self.rootfiles = self.get_src_rp('testfiles/root') + self.rootfiles_out = self.get_dest_rp('testfiles/root_output') + self.rootfiles2 = self.get_src_rp('testfiles/root2') + self.rootfiles21 = self.get_src_rp('testfiles/root2.1') + self.rootfiles_out2 = self.get_dest_rp('testfiles/root_output2') + + self.one_unreadable = self.get_src_rp('testfiles/one_unreadable') + self.one_unreadable_out = self.get_dest_rp('testfiles/unreadable_out') + + def tearDown(self): + print "Taking down connections" + SetConnections.CloseConnections() + + +class IncrementTest(PathSetter): + def testLocalinc(self): + """Test self.incrementing, and then restoring, local""" + self.setPathnames(None, None, None, None) + self.runtest() + + def test_remote_src(self): + """Increment/Restore when source directory is remote""" + self.setPathnames('test1', '../', None, None) + self.runtest() + + def test_remote_dest(self): + """Increment/Restore when target directory is remote""" + self.setPathnames(None, None, 'test2', '../') + self.runtest() + + def test_remote_both(self): + """Increment/Restore when both directories are remote""" + self.setPathnames('test1', '../', 'test2/tmp', '../../') + self.runtest() + + def OldtestRecoveryLocal(self): + """Test to see if rdiff-backup can continue with bad increment""" + os.system(MiscDir+'/myrm testfiles/recovery_out_backup') + self.setPathnames(None, None, None, None) + Time.setprevtime(1006136450) + Time.setcurtime() + Globals.add_regexp('.*rdiff-backup-data', 1) + os.system('cp -a testfiles/recovery_out testfiles/recovery_out_backup') + recovery_in = self.get_src_rp('testfiles/recovery') + recovery_out = self.get_dest_rp('testfiles/recovery_out_backup') + recovery_inc = self.get_dest_rp('testfiles/recovery_out_backup/' + 'rdiff-backup-data/increments') + HighLevel.Mirror_and_increment(recovery_in, recovery_out, + recovery_inc) + # Should probably check integrity of increments, but for now + # allow if it doesn't during the Mirror_and_increment + + def OldtestRecoveryRemote(self): + """Test Recovery with both connections remote""" + os.system(MiscDir+'/myrm testfiles/recovery_out_backup') + self.setPathnames('test1', '../', 'test2/tmp', '../../') + Time.setprevtime(1006136450) + Time.setcurtime() + Globals.add_regexp('.*rdiff-backup-data', 1) + os.system('cp -a testfiles/recovery_out testfiles/recovery_out_backup') + recovery_in = self.get_src_rp('testfiles/recovery') + recovery_out = self.get_dest_rp('testfiles/recovery_out_backup') + recovery_inc = self.get_dest_rp('testfiles/recovery_out_backup/' + 'rdiff-backup-data/increments') + HighLevel.Mirror_and_increment(recovery_in, recovery_out, + recovery_inc) + # Should probably check integrity of increments, but for now + # allow if it doesn't during the Mirror_and_increment + + def runtest(self): + """After setting connections, etc., run actual test using this""" + Time.setcurtime() + SaveState.init_filenames(1) + HighLevel.Mirror(self.inc1rp, self.rpout) + assert RPath.cmp_recursive(Local.inc1rp, Local.rpout) + + Time.setcurtime() + Time.setprevtime(999500000) + HighLevel.Mirror_and_increment(self.inc2rp, self.rpout, self.rpout_inc) + assert RPath.cmp_recursive(Local.inc2rp, Local.rpout) + + Time.setcurtime() + Time.setprevtime(999510000) + HighLevel.Mirror_and_increment(self.inc3rp, self.rpout, self.rpout_inc) + assert RPath.cmp_recursive(Local.inc3rp, Local.rpout) + + Time.setcurtime() + Time.setprevtime(999520000) + HighLevel.Mirror_and_increment(self.inc4rp, self.rpout, self.rpout_inc) + assert RPath.cmp_recursive(Local.inc4rp, Local.rpout) + + + print "Restoring to self.inc4" + HighLevel.Restore(999530000, self.rpout, self.get_inctup(), + self.rpout4) + assert RPath.cmp_recursive(Local.inc4rp, Local.rpout4) + + print "Restoring to self.inc3" + HighLevel.Restore(999520000, self.rpout, self.get_inctup(), + self.rpout3) + assert RPath.cmp_recursive(Local.inc3rp, Local.rpout3) + + print "Restoring to self.inc2" + HighLevel.Restore(999510000, self.rpout, self.get_inctup(), + self.rpout2) + assert RPath.cmp_recursive(Local.inc2rp, Local.rpout2) + + print "Restoring to self.inc1" + HighLevel.Restore(999500000, self.rpout, self.get_inctup(), + self.rpout1) + assert RPath.cmp_recursive(Local.inc1rp, Local.rpout1) + + def get_inctup(self): + """Return inc tuples as expected by Restore.RestoreRecursive + + Assumes output increment directory is + testfiles/output_inc._____. + + """ + filenames = filter(lambda x: x.startswith("output_inc."), + Local.prefix.listdir()) + rplist = map(lambda x: Local.prefix.append(x), filenames) + return IndexedTuple((), (Local.prefix.append("output_inc"), rplist)) + + +class MirrorTest(PathSetter): + """Test some mirroring functions""" + def testLocalMirror(self): + """Test Local mirroring""" + self.setPathnames(None, None, None, None) + self.runtest() + + def testPartialLocalMirror(self): + """Test updating an existing directory""" + self.setPathnames(None, None, None, None) + self.run_partial_test() + + def testRemoteMirror(self): + """Mirroring when destination is remote""" + self.setPathnames(None, None, 'test1', '../') + self.runtest() + + def testPartialRemoteMirror(self): + """Partial mirroring when destination is remote""" + self.setPathnames(None, None, 'test1', '../') + self.run_partial_test() + + def testSourceRemoteMirror(self): + """Mirroring when source is remote""" + self.setPathnames('test2', '../', None, None) + self.runtest() + + def testPartialSourceRemoteMirror(self): + """Partial Mirroring when source is remote""" + self.setPathnames('test2', '../', None, None) + self.run_partial_test() + + def testBothRemoteMirror(self): + """Mirroring when both directories are remote""" + self.setPathnames('test1', '../', 'test2/tmp', '../../') + self.runtest() + + def testPartialBothRemoteMirror(self): + """Partial mirroring when both directories are remote""" + self.setPathnames('test1', '../', 'test2/tmp', '../../') + self.run_partial_test() + + def testNopermsLocal(self): + "Test mirroring a directory that has no permissions" + self.setPathnames(None, None, None, None) + Time.setcurtime() + SaveState.init_filenames(None) + HighLevel.Mirror(self.noperms, self.noperms_out, None) + # Can't compare because we don't have the permissions to do it right + #assert RPath.cmp_recursive(Local.noperms, Local.noperms_out) + + def testNopermsRemote(self): + "No permissions mirroring (remote)" + self.setPathnames('test1', '../', 'test2/tmp', '../../') + Time.setcurtime() + SaveState.init_filenames(None) + HighLevel.Mirror(self.noperms, self.noperms_out, checkpoint=None) + #assert RPath.cmp_recursive(Local.noperms, Local.noperms_out) + + def testPermSkipLocal(self): + """Test to see if rdiff-backup will skip unreadable files""" + self.setPathnames(None, None, None, None) + Globals.change_source_perms = None + Time.setcurtime() + SaveState.init_filenames(None) + HighLevel.Mirror(self.one_unreadable, self.one_unreadable_out, checkpoint=None) + Globals.change_source_perms = 1 + HighLevel.Mirror(self.one_unreadable, self.one_unreadable_out) + # Could add test, but for now just make sure it doesn't exit + + def testPermSkipRemote(self): + """Test skip of unreadable files remote""" + self.setPathnames('test1', '../', 'test2/tmp', '../../') + Globals.change_source_perms = None + Time.setcurtime() + SaveState.init_filenames(None) + HighLevel.Mirror(self.one_unreadable, self.one_unreadable_out) + Globals.change_source_perms = 1 + HighLevel.Mirror(self.one_unreadable, self.one_unreadable_out) + # Could add test, but for now just make sure it doesn't exit + + def refresh(self, *rps): + for rp in rps: rp.setdata() + + def _testRootLocal(self): + """Test mirroring a directory with dev files and different owners""" + self.setPathnames(None, None, None, None) + Globals.change_ownership = 1 + self.refresh(self.rootfiles, self.rootfiles_out, + Local.rootfiles, Local.rootfiles_out) # add uid/gid info + HighLevel.Mirror(self.rootfiles, self.rootfiles_out) + assert RPath.cmp_recursive(Local.rootfiles, Local.rootfiles_out) + Globals.change_ownership = None + self.refresh(self.rootfiles, self.rootfiles_out, + Local.rootfiles, Local.rootfiles_out) # remove that info + + def _testRootRemote(self): + """Mirroring root files both ends remote""" + self.setPathnames('test1', '../', 'test2/tmp', '../../') + for conn in Globals.connections: + conn.Globals.set('change_ownership', 1) + self.refresh(self.rootfiles, self.rootfiles_out, + Local.rootfiles, Local.rootfiles_out) # add uid/gid info + HighLevel.Mirror(self.rootfiles, self.rootfiles_out) + assert RPath.cmp_recursive(Local.rootfiles, Local.rootfiles_out) + for coon in Globals.connections: + conn.Globals.set('change_ownership', None) + self.refresh(self.rootfiles, self.rootfiles_out, + Local.rootfiles, Local.rootfiles_out) # remove that info + + def testRoot2Local(self): + """Make sure we can backup a directory we don't own""" + self.setPathnames(None, None, None, None) + Globals.change_ownership = Globals.change_source_perms = None + self.refresh(self.rootfiles2, self.rootfiles_out2, + Local.rootfiles2, Local.rootfiles_out2) # add uid/gid info + HighLevel.Mirror(self.rootfiles2, self.rootfiles_out2) + assert RPath.cmp_recursive(Local.rootfiles2, Local.rootfiles_out2) + self.refresh(self.rootfiles2, self.rootfiles_out2, + Local.rootfiles2, Local.rootfiles_out2) # remove that info + HighLevel.Mirror(self.rootfiles21, self.rootfiles_out2) + assert RPath.cmp_recursive(Local.rootfiles21, Local.rootfiles_out2) + self.refresh(self.rootfiles21, self.rootfiles_out2, + Local.rootfiles21, Local.rootfiles_out2) # remove that info + Globals.change_source_perms = 1 + + def deleteoutput(self): + os.system(MiscDir+"/myrm testfiles/output*") + self.rbdir = self.rpout.append('rdiff-backup-data') + self.rpout.mkdir() + self.rbdir.mkdir() + self.reset_rps() + + def reset_rps(self): + """Use after external changes made, to update the rps""" + for rp in [self.rpout, Local.rpout, + self.rpout_inc, Local.rpout_inc, + self.rpout1, Local.rpout1, + self.rpout2, Local.rpout2, + self.rpout3, Local.rpout3, + self.rpout4, Local.rpout4]: + rp.setdata() + + def runtest(self): + Time.setcurtime() + SaveState.init_filenames(None) + assert self.rbdir.lstat() + HighLevel.Mirror(self.inc1rp, self.rpout) + assert RPath.cmp_recursive(Local.inc1rp, Local.rpout) + + self.deleteoutput() + + HighLevel.Mirror(self.inc2rp, self.rpout) + assert RPath.cmp_recursive(Local.inc2rp, Local.rpout) + + def run_partial_test(self): + os.system("cp -a testfiles/increment3 testfiles/output") + self.reset_rps() + + Time.setcurtime() + SaveState.init_filenames(None) + HighLevel.Mirror(self.inc1rp, self.rpout) + #RPath.copy_attribs(self.inc1rp, self.rpout) + assert RPath.cmp_recursive(Local.inc1rp, Local.rpout) + + HighLevel.Mirror(self.inc2rp, self.rpout) + assert RPath.cmp_recursive(Local.inc2rp, Local.rpout) + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/restoretest.py b/rdiff-backup/testing/restoretest.py new file mode 100644 index 0000000..7672e1a --- /dev/null +++ b/rdiff-backup/testing/restoretest.py @@ -0,0 +1,47 @@ +import unittest + +execfile("commontest.py") +rbexec("restore.py") + + +lc = Globals.local_connection + +class RestoreTest(unittest.TestCase): + """Test Restore class""" + prefix = "testfiles/restoretest/" + def maketesttuples(self, basename): + """Make testing tuples from available files starting with prefix""" + dirlist = os.listdir(self.prefix) + baselist = filter(lambda f: f.startswith(basename), dirlist) + rps = map(lambda f: RPath(lc, self.prefix+f), baselist) + incs = filter(lambda rp: rp.isincfile(), rps) + tuples = map(lambda rp: (rp, RPath(lc, "%s.%s" % + (rp.getincbase().path, + rp.getinctime()))), + incs) + return tuples, incs + + def restoreonefiletest(self, basename): + tuples, incs = self.maketesttuples(basename) + rpbase = RPath(lc, self.prefix + basename) + rptarget = RPath(lc, "testfiles/outfile") + + if rptarget.lstat(): rptarget.delete() + for pair in tuples: + print "Processing file " + pair[0].path + rest_time = Time.stringtotime(pair[0].getinctime()) + Restore.RestoreFile(rest_time, rpbase, incs, rptarget) + if not rptarget.lstat(): assert not pair[1].lstat() + else: + assert RPath.cmp(rptarget, pair[1]), \ + "%s %s" % (rptarget.path, pair[1].path) + assert RPath.cmp_attribs(rptarget, pair[1]), \ + "%s %s" % (rptarget.path, pair[1].path) + rptarget.delete() + + def testRestorefiles(self): + """Testing restoration of files one at a time""" + map(self.restoreonefiletest, ["ocaml", "mf"]) + + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/rlisttest.py b/rdiff-backup/testing/rlisttest.py new file mode 100644 index 0000000..2fe258b --- /dev/null +++ b/rdiff-backup/testing/rlisttest.py @@ -0,0 +1,98 @@ +import unittest +execfile("commontest.py") +rbexec("rlist.py") + +class BasicObject: + """The simplest object that can be used with RList""" + def __init__(self, i): + self.index = i + self.data = "This is object # %d" % i + + def __eq__(self, other): + return self.index == other.index and self.data == other.data + +l1_pre = filter(lambda x: x != 342 and not x in [650, 651, 652] and + x != 911 and x != 987, + range(1, 1001)) +l2_pre = filter(lambda x: not x in [222, 223, 224, 225] and x != 950 + and x != 999 and x != 444, + range(1, 1001)) + +l1 = map(BasicObject, l1_pre) +l2 = map(BasicObject, l2_pre) +combined = map(BasicObject, range(1, 1001)) + +def lmaphelper2((x, i)): + """Return difference triple to say that index x only in list # i""" + if i == 1: return (BasicObject(x), None) + elif i == 2: return (None, BasicObject(x)) + else: assert 0, "Invalid parameter %s for i" % i + +difference1 = map(lmaphelper2, [(222, 1), (223, 1), (224, 1), (225, 1), + (342, 2), (444, 1), (650, 2), (651, 2), + (652, 2), (911, 2), (950, 1), (987, 2), + (999, 1)]) +difference2 = map(lambda (a, b): (b, a), difference1) + +def comparelists(l1, l2): + print len(l1), len(l2) + for i in range(len(l1)): + if l1[i] != l2[i]: print l1[i], l2[i] + print l1 + print l2 + + + +class RListTest(unittest.TestCase): + def setUp(self): + """Make signatures, deltas""" + self.l1_sig = RList.Signatures(l1) + self.l2_sig = RList.Signatures(l2) + self.l1_to_l2_diff = RList.Deltas(self.l1_sig, l2) + self.l2_to_l1_diff = RList.Deltas(self.l2_sig, l1) + +# for d in makedeltas(makesigs(l2ci(l1)), l2ci(l2)): +# print d.min, d.max +# print d.elemlist + + def testPatching(self): + """Test to make sure each list can be reconstructed from other""" + newlist = list(RList.Patch(l1, RList.Deltas(RList.Signatures(l1), + l2))) + assert l2 == newlist + newlist = list(RList.Patch(l2, RList.Deltas(RList.Signatures(l2), + l1))) + assert l1 == newlist + + def testDifference(self): + """Difference between lists correctly identified""" + diff = list(RList.Dissimilar(l1, RList.Deltas(RList.Signatures(l1), + l2))) + assert diff == difference1 + diff = list(RList.Dissimilar(l2, RList.Deltas(RList.Signatures(l2), + l1))) + assert diff == difference2 + + + +class CachingIterTest(unittest.TestCase): + """Test the Caching Iter object""" + def testNormalIter(self): + """Make sure it can act like a normal iterator""" + ci = CachingIter(iter(range(10))) + for i in range(10): assert i == ci.next() + self.assertRaises(StopIteration, ci.next) + + def testPushing(self): + """Pushing extra objects onto the iterator""" + ci = CachingIter(iter(range(10))) + ci.push(12) + ci.push(11) + assert ci.next() == 11 + assert ci.next() == 12 + assert ci.next() == 0 + ci.push(10) + assert ci.next() == 10 + + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/robusttest.py b/rdiff-backup/testing/robusttest.py new file mode 100644 index 0000000..4f08e44 --- /dev/null +++ b/rdiff-backup/testing/robusttest.py @@ -0,0 +1,86 @@ +import os, unittest + +execfile("commontest.py") +rbexec("setconnections.py") + +class TestRobustAction(unittest.TestCase): + """Test some robust actions""" + def testCopyWithAttribs(self): + """Test copy with attribs action""" + rpin = RPath(Globals.local_connection, "./testfiles/robust/in") + fp = open("./testfiles/robust/in", "wb") + fp.write("hello there") + fp.close() + os.chmod("./testfiles/robust/in", 0604) + rpin.setdata() + assert rpin.isreg() and rpin.getperms() % 01000 == 0604 + + rpout = RPath(Globals.local_connection, "./testfiles/robust/out") + Robust.copy_with_attribs_action(rpin, rpout).execute() + if not rpout == rpin: + print rpout, rpin + assert 0 + + rpout.delete() + rpin.delete() + + +class TempFileTest(unittest.TestCase): + """Test creation and management of tempfiles""" + rp_base = RPath(Globals.local_connection, + "./testfiles/robust/testfile_base") + def testBasic(self): + """Make a temp file, write to it, and then delete it + + Also test tempfile accounting and file name prefixing. + + """ + assert not TempFileManager._tempfiles + tf = TempFileManager.new(self.rp_base) + assert TempFileManager._tempfiles == [tf] + assert tf.dirsplit()[0] == "testfiles/robust", tf.dirsplit()[0] + assert not tf.lstat() + fp = tf.open("w") + fp.write("hello") + assert not fp.close() + fp = tf.open("r") + assert fp.read() == "hello" + assert not fp.close() + tf.delete() + assert not TempFileManager._tempfiles + + def testRename(self): + """Test renaming of tempfile""" + tf = TempFileManager.new(self.rp_base) + assert TempFileManager._tempfiles + tf.touch() + destination = RPath(Globals.local_connection, + "./testfiles/robust/testfile_dest") + tf.rename(destination) + assert not TempFileManager._tempfiles + assert destination.lstat() + destination.delete() + + +class SaveStateTest(unittest.TestCase): + """Test SaveState class""" + data_dir = RPath(Globals.local_connection, "testfiles/robust") + def testSymlinking(self): + """Test recording last file with symlink""" + last_rorp = RORPath(('usr', 'local', 'bin', 'ls')) + Globals.rbdir = self.data_dir + Time.setcurtime() + SetConnections.BackupInitConnections(Globals.local_connection, + Globals.local_connection) + SaveState.init_filenames(None) + SaveState.record_last_file_action(last_rorp).execute() + + sym_rp = RPath(Globals.local_connection, + "testfiles/robust/last-file-mirrored.%s.snapshot" % + Time.curtimestr) + assert sym_rp.issym() + assert sym_rp.readlink() == "increments/usr/local/bin/ls" + sym_rp.delete() + + +if __name__ == '__main__': unittest.main() diff --git a/rdiff-backup/testing/roottest.py b/rdiff-backup/testing/roottest.py new file mode 100644 index 0000000..18a0afc --- /dev/null +++ b/rdiff-backup/testing/roottest.py @@ -0,0 +1,165 @@ +execfile("../src/setconnections.py") +import unittest, os + +"""Root tests + +This is mainly a copy of regressiontest.py, but contains the two tests +that are meant to be run as root. +""" + +Globals.change_source_perms = 1 +Globals.counter = 0 +Log.setverbosity(4) + +class Local: + """This is just a place to put increments relative to the local + connection""" + def get_local_rp(extension): + return RPath(Globals.local_connection, "testfiles/" + extension) + + inc1rp = get_local_rp('increment1') + inc2rp = get_local_rp('increment2') + inc3rp = get_local_rp('increment3') + inc4rp = get_local_rp('increment4') + + rpout = get_local_rp('output') + rpout_inc = get_local_rp('output_inc') + rpout1 = get_local_rp('restoretarget1') + rpout2 = get_local_rp('restoretarget2') + rpout3 = get_local_rp('restoretarget3') + rpout4 = get_local_rp('restoretarget4') + + noperms = get_local_rp('noperms') + noperms_out = get_local_rp('noperms_output') + + rootfiles = get_local_rp('root') + rootfiles_out = get_local_rp('root_output') + + prefix = get_local_rp('.') + + +class PathSetter(unittest.TestCase): + def get_prefix_and_conn(self, path, return_path): + """Return (prefix, connection) tuple""" + if path: + return (return_path, + SetConnections.init_connection("./chdir-wrapper "+path)) + else: return ('./', Globals.local_connection) + + def get_src_rp(self, path): + return RPath(self.src_conn, self.src_prefix + path) + + def get_dest_rp(self, path): + return RPath(self.dest_conn, self.dest_prefix + path) + + + def set_rbdir(self, rpout): + """Create rdiff-backup-data dir if not already, tell everyone""" + self.rbdir = self.rpout.append('rdiff-backup-data') + self.rpout.mkdir() + self.rbdir.mkdir() + SetConnections.UpdateGlobal('rbdir', self.rbdir) + + # Better safe than sorry - cover all possibilities + Globals.add_regexp("testfiles/output/rdiff-backup-data", 1) + Globals.add_regexp("./testfiles/output/rdiff-backup-data", 1) + Globals.add_regexp("../testfiles/output/rdiff-backup-data", 1) + Globals.add_regexp("../../testfiles/output/rdiff-backup-data", 1) + + def setPathnames(self, src_path, src_return, dest_path, dest_return): + """Start servers which will run in src_path and dest_path respectively + + If either is None, then no server will be run and local + process will handle that end. src_return and dest_return are + the prefix back to the original rdiff-backup directory. So + for instance is src_path is "test2/tmp", then src_return will + be '../'. + + """ + # Clear old data that may rely on deleted connections + Globals.isbackup_writer = None + Globals.isbackup_reader = None + Globals.rbdir = None + + print "Setting up connection" + self.src_prefix, self.src_conn = \ + self.get_prefix_and_conn(src_path, src_return) + self.dest_prefix, self.dest_conn = \ + self.get_prefix_and_conn(dest_path, dest_return) + SetConnections.BackupInitConnections(self.src_conn, self.dest_conn) + + os.system("./myrm testfiles/output* testfiles/restoretarget* " + "testfiles/noperms_output testfiles/root_output " + "testfiles/unreadable_out") + self.inc1rp = self.get_src_rp("testfiles/increment1") + self.inc2rp = self.get_src_rp('testfiles/increment2') + self.inc3rp = self.get_src_rp('testfiles/increment3') + self.inc4rp = self.get_src_rp('testfiles/increment4') + + self.rpout_inc = self.get_dest_rp('testfiles/output_inc') + self.rpout1 = self.get_dest_rp('testfiles/restoretarget1') + self.rpout2 = self.get_dest_rp('testfiles/restoretarget2') + self.rpout3 = self.get_dest_rp('testfiles/restoretarget3') + self.rpout4 = self.get_dest_rp('testfiles/restoretarget4') + + self.rpout = self.get_dest_rp('testfiles/output') + self.set_rbdir(self.rpout) + + self.noperms = self.get_src_rp('testfiles/noperms') + self.noperms_out = self.get_dest_rp('testfiles/noperms_output') + + self.rootfiles = self.get_src_rp('testfiles/root') + self.rootfiles_out = self.get_dest_rp('testfiles/root_output') + + self.one_unreadable = self.get_src_rp('testfiles/one_unreadable') + self.one_unreadable_out = self.get_dest_rp('testfiles/unreadable_out') + + def tearDown(self): + print "Taking down connections" + SetConnections.CloseConnections() + os.system("./myrm testfiles/output* testfiles/restoretarget* " + "testfiles/noperms_output testfiles/root_output " + "testfiles/unreadable_out") + + +class MirrorTest(PathSetter): + """Test some mirroring functions""" + def refresh(self, *rps): + for rp in rps: rp.setdata() + + def testRootLocal(self): + """Test mirroring a directory with dev files and different owners""" + self.setPathnames(None, None, None, None) + Time.setcurtime() + SaveState.init_filenames(None) + Globals.change_ownership = 1 + self.refresh(self.rootfiles, self.rootfiles_out, + Local.rootfiles, Local.rootfiles_out) # add uid/gid info + HighLevel.Mirror(self.rootfiles, self.rootfiles_out) + assert RPath.cmp_recursive(Local.rootfiles, Local.rootfiles_out) + Globals.change_ownership = None + self.refresh(self.rootfiles, self.rootfiles_out, + Local.rootfiles, Local.rootfiles_out) # remove that info + + def testRootRemote(self): + """Mirroring root files both ends remote""" + self.setPathnames('test1', '../', 'test2/tmp', '../../') + Time.setcurtime() + SaveState.init_filenames(None) + for conn in Globals.connections: + conn.Globals.set('change_ownership', 1) + self.refresh(self.rootfiles, self.rootfiles_out, + Local.rootfiles, Local.rootfiles_out) # add uid/gid info + HighLevel.Mirror(self.rootfiles, self.rootfiles_out) + assert RPath.cmp_recursive(Local.rootfiles, Local.rootfiles_out) + for coon in Globals.connections: + conn.Globals.set('change_ownership', None) + self.refresh(self.rootfiles, self.rootfiles_out, + Local.rootfiles, Local.rootfiles_out) # remove that info + + def deleteoutput(self): + os.system("./myrm testfiles/output*") + self.reset_rps() + + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/rorpitertest.py b/rdiff-backup/testing/rorpitertest.py new file mode 100644 index 0000000..b9d558f --- /dev/null +++ b/rdiff-backup/testing/rorpitertest.py @@ -0,0 +1,105 @@ +import unittest +execfile("commontest.py") +rbexec("highlevel.py") + + +#Log.setverbosity(8) + +class index: + """This is just used below to test the iter tree reducer""" + def __init__(self, index): + self.index = index + + +class RORPIterTest(unittest.TestCase): + def setUp(self): + self.lc = Globals.local_connection + self.inc0rp = RPath(self.lc, "testfiles/empty", ()) + self.inc1rp = RPath(self.lc, "testfiles/inc-reg-perms1", ()) + self.inc2rp = RPath(self.lc, "testfiles/inc-reg-perms2", ()) + self.output = RPath(self.lc, "testfiles/output", ()) + + def testCollateIterators(self): + """Test basic collating""" + indicies = map(index, [0,1,2,3]) + helper = lambda i: indicies[i] + + makeiter1 = lambda: iter(indicies) + makeiter2 = lambda: iter(map(helper, [0,1,3])) + makeiter3 = lambda: iter(map(helper, [1,2])) + + outiter = RORPIter.CollateIterators(makeiter1(), makeiter2()) + assert Iter.equal(outiter, + iter([(indicies[0], indicies[0]), + (indicies[1], indicies[1]), + (indicies[2], None), + (indicies[3], indicies[3])])) + + assert Iter.equal(RORPIter.CollateIterators(makeiter1(), + makeiter2(), + makeiter3()), + iter([(indicies[0], indicies[0], None), + (indicies[1], indicies[1], indicies[1]), + (indicies[2], None, indicies[2]), + (indicies[3], indicies[3], None)])) + + assert Iter.equal(RORPIter.CollateIterators(makeiter1(), iter([])), + iter(map(lambda i: (i, None), + indicies))) + assert Iter.equal(iter(map(lambda i: (i, None), indicies)), + RORPIter.CollateIterators(makeiter1(), iter([]))) + + + def testCombinedPatching(self): + """Combined signature, patch, and diff operations""" + if self.output.lstat(): self.output.delete() + + def turninto(final_rp): + sigfile = RORPIter.ToFile(RORPIter.GetSignatureIter(self.output)) + diff_file = RORPIter.ToFile( + RORPIter.GetDiffIter(RORPIter.FromFile(sigfile), + RORPIter.IterateRPaths(final_rp))) + RORPIter.PatchIter(self.output, RORPIter.FromFile(diff_file)) + + turninto(self.inc1rp) + assert self.compare_rps(self.output, self.inc1rp) + turninto(self.inc2rp) + assert self.compare_rps(self.output, self.inc2rp) + + def compare_rps(self, rp1, rp2): + """True if rp1 and rp2 are equal in some sense""" + def RawIter(rp): + """Get raw iterator of file stats based an rp1""" + return RORPIter.ToRaw(Iter.map(lambda rp2: rp2.getRORPath(), + RORPIter.IterateRPaths(rp))) + ri1 = RawIter(rp1) + ri2 = RawIter(rp2) + try: + rorp1 = ri1.next() + rorp2 = ri2.next() + assert rorp1 == rorp2, "%s %s" % (rorp1, rorp2) + except StopIteration: pass + return 1 + # return Iter.equal(RawIter(rp1), RawIter(rp2)) + + +class IndexedTupleTest(unittest.TestCase): + def testTuple(self): + """Test indexed tuple""" + i = IndexedTuple((1,2,3), ("a", "b")) + i2 = IndexedTuple((), ("hello", "there", "how are you")) + + assert i[0] == "a" + assert i[1] == "b" + assert i2[1] == "there" + assert len(i) == 2 and len(i2) == 3 + assert i2 < i, i2 < i + + def testTupleAssignment(self): + a, b, c = IndexedTuple((), (1, 2, 3)) + assert a == 1 + assert b == 2 + assert c == 3 + +if __name__ == "__main__": unittest.main() + diff --git a/rdiff-backup/testing/rpathtest.py b/rdiff-backup/testing/rpathtest.py new file mode 100644 index 0000000..0b28f14 --- /dev/null +++ b/rdiff-backup/testing/rpathtest.py @@ -0,0 +1,337 @@ +import os, cPickle, sys, unittest +execfile("commontest.py") +rbexec("highlevel.py") + + + +class RPathTest(unittest.TestCase): + lc = Globals.local_connection + prefix = "testfiles/various_file_types/" + mainprefix = "testfiles/" + rp_prefix = RPath(lc, prefix, ()) + rp_main = RPath(lc, mainprefix, ()) + + +class RORPStateTest(RPathTest): + """Test Pickling of RORPaths""" + def testPickle(self): + rorp = RPath(self.lc, self.prefix, ("regular_file",)).getRORPath() + rorp.file = sys.stdin # try to confuse pickler + assert rorp.isreg() + rorp2 = cPickle.loads(cPickle.dumps(rorp, 1)) + assert rorp2.isreg() + assert rorp2.data == rorp.data and rorp.index == rorp2.index + + +class CheckTypes(RPathTest): + """Check to see if file types are identified correctly""" + def testExist(self): + """Can tell if files exist""" + assert RPath(self.lc, self.prefix, ()).lstat() + assert not RPath(self.lc, "asuthasetuouo", ()).lstat() + + def testDir(self): + """Directories identified correctly""" + assert RPath(self.lc, self.prefix, ()).isdir() + assert not RPath(self.lc, self.prefix, ("regular_file",)).isdir() + + def testSym(self): + """Symbolic links identified""" + assert RPath(self.lc, self.prefix, ("symbolic_link",)).issym() + assert not RPath(self.lc, self.prefix, ()).issym() + + def testReg(self): + """Regular files identified""" + assert RPath(self.lc, self.prefix, ("regular_file",)).isreg() + assert not RPath(self.lc, self.prefix, ("symbolic_link",)).isreg() + + def testFifo(self): + """Fifo's identified""" + assert RPath(self.lc, self.prefix, ("fifo",)).isfifo() + assert not RPath(self.lc, self.prefix, ()).isfifo() + + def testCharDev(self): + """Char special files identified""" + assert RPath(self.lc, "/dev/tty2", ()).ischardev() + assert not RPath(self.lc, self.prefix, ("regular_file",)).ischardev() + + def testBlockDev(self): + """Block special files identified""" + assert RPath(self.lc, "/dev/hda", ()).isblkdev() + assert not RPath(self.lc, self.prefix, ("regular_file",)).isblkdev() + + +class CheckPerms(RPathTest): + """Check to see if permissions are reported and set accurately""" + def testExecReport(self): + """Check permissions for executable files""" + assert self.rp_prefix.append('executable').getperms() == 0755 + assert self.rp_prefix.append('executable2').getperms() == 0700 + + def testOrdinaryReport(self): + """Ordinary file permissions...""" + assert self.rp_prefix.append("regular_file").getperms() == 0644 + assert self.rp_prefix.append('two_hardlinked_files1').getperms() == 0640 + + def testChmod(self): + """Test changing file permission""" + rp = self.rp_prefix.append("changeable_permission") + rp.chmod(0700) + assert rp.getperms() == 0700 + rp.chmod(0644) + assert rp.getperms() == 0644 + + def testExceptions(self): + """What happens when file absent""" + self.assertRaises(Exception, + RPath(self.lc, self.prefix, ("aoeunto",)).getperms) + + +class CheckDir(RPathTest): + """Check directory related functions""" + def testCreation(self): + """Test directory creation and deletion""" + d = self.rp_prefix.append("tempdir") + assert not d.lstat() + d.mkdir() + assert d.isdir() + d.rmdir() + assert not d.lstat() + + def testExceptions(self): + """Should raise os.errors when no files""" + d = RPath(self.lc, self.prefix, ("suthosutho",)) + self.assertRaises(os.error, d.rmdir) + d.mkdir() + self.assertRaises(os.error, d.mkdir) + d.rmdir() + + def testListdir(self): + """Checking dir listings""" + assert (RPath(self.lc, self.mainprefix, ("sampledir",)).listdir() == + ["1", "2", "3", "4"]) + + +class CheckSyms(RPathTest): + """Check symlinking and reading""" + def testRead(self): + """symlink read""" + assert (RPath(self.lc, self.prefix, ("symbolic_link",)).readlink() == + "regular_file") + + def testMake(self): + """Creating symlink""" + link = RPath(self.lc, self.mainprefix, ("symlink",)) + assert not link.lstat() + link.symlink("abcdefg") + assert link.issym() + assert link.readlink() == "abcdefg" + link.delete() + + +class TouchDelete(RPathTest): + """Check touching and deletion of files""" + def testTouch(self): + """Creation of 0 length files""" + t = RPath(self.lc, self.mainprefix, ("testtouch",)) + assert not t.lstat() + t.touch() + assert t.lstat() + t.delete() + + def testDelete(self): + """Deletion of files""" + d = RPath(self.lc, self.mainprefix, ("testdelete",)) + d.touch() + assert d.lstat() + d.delete() + assert not d.lstat() + + +class MiscFileInfo(RPathTest): + """Check Miscellaneous file information""" + def testFileLength(self): + """File length = getsize()""" + assert (RPath(self.lc, self.prefix, ("regular_file",)).getsize() == + 75650) + + +class FilenameOps(RPathTest): + """Check filename operations""" + weirdfilename = eval('\'\\xd8\\xab\\xb1Wb\\xae\\xc5]\\x8a\\xbb\\x15v*\\xf4\\x0f!\\xf9>\\xe2Y\\x86\\xbb\\xab\\xdbp\\xb0\\x84\\x13k\\x1d\\xc2\\xf1\\xf5e\\xa5U\\x82\\x9aUV\\xa0\\xf4\\xdf4\\xba\\xfdX\\x03\\x82\\x07s\\xce\\x9e\\x8b\\xb34\\x04\\x9f\\x17 \\xf4\\x8f\\xa6\\xfa\\x97\\xab\\xd8\\xac\\xda\\x85\\xdcKvC\\xfa#\\x94\\x92\\x9e\\xc9\\xb7\\xc3_\\x0f\\x84g\\x9aB\\x11<=^\\xdbM\\x13\\x96c\\x8b\\xa7|*"\\\\\\\'^$@#!(){}?+ ~` \'') + normdict = {"/": "/", + ".": ".", + "//": "/", + "/a/b": "/a/b", + "a/b": "a/b", + "a//b": "a/b", + "a////b//c": "a/b/c", + "..": "..", + "a/": "a", + "/a//b///": "/a/b"} + dirsplitdict = {"/": ("", ""), + "/a": ("", "a"), + "/a/b": ("/a", "b"), + ".": (".", "."), + "b/c": ("b", "c"), + "a": (".", "a")} + + def testQuote(self): + """See if filename quoting works""" + wtf = RPath(self.lc, self.prefix, (self.weirdfilename,)) + reg = RPath(self.lc, self.prefix, ("regular_file",)) + assert wtf.lstat() + assert reg.lstat() + assert not os.system("ls %s >/dev/null 2>&1" % wtf.quote()) + assert not os.system("ls %s >/dev/null 2>&1" % reg.quote()) + + def testNormalize(self): + """rpath.normalize() dictionary test""" + for (before, after) in self.normdict.items(): + assert RPath(self.lc, before, ()).normalize().path == after, \ + "Normalize fails for %s => %s" % (before, after) + + def testDirsplit(self): + """Test splitting of various directories""" + for full, split in self.dirsplitdict.items(): + result = RPath(self.lc, full, ()).dirsplit() + assert result == split, \ + "%s => %s instead of %s" % (full, result, split) + + def testGetnums(self): + """Test getting file numbers""" + devnums = RPath(self.lc, "/dev/hda", ()).getdevnums() + assert devnums == (3, 0), devnums + devnums = RPath(self.lc, "/dev/tty2", ()).getdevnums() + assert devnums == (4, 2), devnums + + +class FileIO(RPathTest): + """Test file input and output""" + def testRead(self): + """File reading""" + fp = RPath(self.lc, self.prefix, ("executable",)).open("r") + assert fp.read(6) == "#!/bin" + fp.close() + + def testWrite(self): + """File writing""" + rp = RPath(self.lc, self.mainprefix, ("testfile",)) + fp = rp.open("w") + fp.write("hello") + fp.close() + fp_input = rp.open("r") + assert fp_input.read() == "hello" + fp_input.close() + rp.delete() + + +class FileCopying(RPathTest): + """Test file copying and comparison""" + def setUp(self): + self.hl1 = RPath(self.lc, self.prefix, ("two_hardlinked_files1",)) + self.hl2 = RPath(self.lc, self.prefix, ("two_hardlinked_files2",)) + self.sl = RPath(self.lc, self.prefix, ("symbolic_link",)) + self.dir = RPath(self.lc, self.prefix, ()) + self.fifo = RPath(self.lc, self.prefix, ("fifo",)) + self.rf = RPath(self.lc, self.prefix, ("regular_file",)) + self.dest = RPath(self.lc, self.mainprefix, ("dest",)) + if self.dest.lstat(): self.dest.delete() + assert not self.dest.lstat() + + def testComp(self): + """Test comparisons involving regular files""" + assert RPath.cmp(self.hl1, self.hl2) + assert not RPath.cmp(self.rf, self.hl1) + assert not RPath.cmp(self.dir, self.rf) + + def testCompMisc(self): + """Test miscellaneous comparisons""" + assert RPath.cmp(self.dir, RPath(self.lc, self.mainprefix, ())) + self.dest.symlink("regular_file") + assert RPath.cmp(self.sl, self.dest) + self.dest.delete() + assert not RPath.cmp(self.sl, self.fifo) + assert not RPath.cmp(self.dir, self.sl) + + def testCopy(self): + """Test copy of various files""" + for rp in [self.sl, self.rf, self.fifo, self.dir]: + RPath.copy(rp, self.dest) + assert self.dest.lstat(), "%s doesn't exist" % self.dest.path + assert RPath.cmp(rp, self.dest) + assert RPath.cmp(self.dest, rp) + self.dest.delete() + + +class FileAttributes(FileCopying): + """Test file attribute operations""" + def setUp(self): + FileCopying.setUp(self) + self.noperms = RPath(self.lc, self.mainprefix, ("noperms",)) + self.nowrite = RPath(self.lc, self.mainprefix, ("nowrite",)) + self.exec1 = RPath(self.lc, self.prefix, ("executable",)) + self.exec2 = RPath(self.lc, self.prefix, ("executable2",)) + self.test = RPath(self.lc, self.prefix, ("test",)) + self.nothing = RPath(self.lc, self.prefix, ("aoeunthoenuouo",)) + self.sym = RPath(self.lc, self.prefix, ("symbolic_link",)) + + def testComp(self): + """Test attribute comparison success""" + testpairs = [(self.hl1, self.hl2)] + for a, b in testpairs: + assert RPath.cmp_attribs(a, b), "Err with %s %s" % (a.path, b.path) + assert RPath.cmp_attribs(b, a), "Err with %s %s" % (b.path, a.path) + + def testCompFail(self): + """Test attribute comparison failures""" + testpairs = [(self.nowrite, self.noperms), + (self.exec1, self.exec2), + (self.rf, self.hl1)] + for a, b in testpairs: + assert not RPath.cmp_attribs(a, b), \ + "Err with %s %s" % (a.path, b.path) + assert not RPath.cmp_attribs(b, a), \ + "Err with %s %s" % (b.path, a.path) + + def testCompRaise(self): + """Should raise exception when file missing""" + self.assertRaises(RPathException, RPath.cmp_attribs, + self.nothing, self.hl1) + self.assertRaises(RPathException, RPath.cmp_attribs, + self.noperms, self.nothing) + + def testCopyAttribs(self): + """Test copying attributes""" + t = RPath(self.lc, self.mainprefix, ("testattribs",)) + if t.lstat(): t.delete() + for rp in [self.noperms, self.nowrite, self.rf, self.exec1, + self.exec2, self.hl1, self.dir]: + t.touch() + RPath.copy_attribs(rp, t) + assert RPath.cmp_attribs(t, rp), \ + "Attributes for file %s not copied successfully" % rp.path + t.delete() + + def testCopyWithAttribs(self): + """Test copying with attribs (bug found earlier)""" + out = RPath(self.lc, self.mainprefix, ("out",)) + if out.lstat(): out.delete() + for rp in [self.noperms, self.nowrite, self.rf, self.exec1, + self.exec2, self.hl1, self.dir, self.sym]: + RPath.copy_with_attribs(rp, out) + assert RPath.cmp(rp, out) + assert RPath.cmp_attribs(rp, out) + out.delete() + + def testCopyRaise(self): + """Should raise exception for non-existent files""" + self.assertRaises(RPathException, RPath.copy_attribs, + self.hl1, self.nothing) + self.assertRaises(RPathException, RPath.copy_attribs, + self.nothing, self.nowrite) + + + +if __name__ == "__main__": + unittest.main() diff --git a/rdiff-backup/testing/server.py b/rdiff-backup/testing/server.py new file mode 100755 index 0000000..b30c745 --- /dev/null +++ b/rdiff-backup/testing/server.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python + +import sys +execfile("commontest.py") +rbexec("setconnections.py") + +def Test_SetConnGlobals(conn, name, val): + """Used in unittesting - set one of specified connection's Global vars""" + conn.Globals.set(name, val) + +Log.setverbosity(9) +PipeConnection(sys.stdin, sys.stdout).Server() diff --git a/rdiff-backup/testing/setconnectionstest.py b/rdiff-backup/testing/setconnectionstest.py new file mode 100644 index 0000000..d5d2671 --- /dev/null +++ b/rdiff-backup/testing/setconnectionstest.py @@ -0,0 +1,26 @@ +import unittest +execfile("commontest.py") +rbexec("setconnections.py") + +class SetConnectionsTest(unittest.TestCase): + """Set SetConnections Class""" + def testParsing(self): + """Test parsing of various file descriptors""" + pfd = SetConnections.parse_file_desc + assert pfd("bescoto@folly.stanford.edu::/usr/bin/ls") == \ + ("bescoto@folly.stanford.edu", "/usr/bin/ls") + assert pfd("hello there::/goodbye:euoeu") == \ + ("hello there", "/goodbye:euoeu") + assert pfd(r"test\\ing\::more::and more\\..") == \ + (r"test\ing::more", r"and more\\.."), \ + pfd(r"test\\ing\::more::and more\\..") + assert pfd("a:b:c:d::e") == ("a:b:c:d", "e") + assert pfd("foobar") == (None, "foobar") + assert pfd(r"hello\::there") == (None, "hello\::there") + + self.assertRaises(SetConnectionsException, pfd, r"hello\:there::") + self.assertRaises(SetConnectionsException, pfd, "foobar\\") + + + +if __name__ == "__main__": unittest.main() diff --git a/rdiff-backup/testing/statictest.py b/rdiff-backup/testing/statictest.py new file mode 100644 index 0000000..a9ff812 --- /dev/null +++ b/rdiff-backup/testing/statictest.py @@ -0,0 +1,63 @@ +import unittest, types +execfile("commontest.py") +rbexec("static.py") + + +class D: + def foo(x, y): + return x, y + def bar(self, x): + return 3, x + def _hello(self): + return self + +MakeStatic(D) + + +class C: + _a = 0 + def get(cls): + return cls._a + def inc(cls): + cls._a = cls._a + 1 + +MakeClass(C) + + +class StaticMethodsTest(unittest.TestCase): + """Test StaticMethods module""" + def testType(self): + """Methods should have type StaticMethod""" + assert type(D.foo) is types.FunctionType + assert type(D.bar) is types.FunctionType + + def testStatic(self): + """Methods should be callable without instance""" + assert D.foo(1,2) == (1,2) + assert D.bar(3,4) == (3,4) + + def testBound(self): + """Methods should also work bound""" + d = D() + assert d.foo(1,2) == (1,2) + assert d.bar(3,4) == (3,4) + + def testStatic_(self): + """_ Methods should be untouched""" + d = D() + self.assertRaises(TypeError, d._hello, 4) + assert d._hello() is d + + +class ClassMethodsTest(unittest.TestCase): + def test(self): + """Test MakeClass function""" + assert C.get() == 0 + C.inc() + assert C.get() == 1 + C.inc() + assert C.get() == 2 + + +if __name__ == "__main__": + unittest.main() diff --git a/rdiff-backup/testing/testall.py b/rdiff-backup/testing/testall.py new file mode 100644 index 0000000..5389408 --- /dev/null +++ b/rdiff-backup/testing/testall.py @@ -0,0 +1,26 @@ +import unittest + +"""This probably doesn't work any more - just run the tests manually.""" + +from connectiontest import * +#from destructive-steppingtest import * +from dstest import * +from highleveltest import * +from incrementtest import * +from iterfiletest import * +from lazytest import * +from rdifftest import * +from regressiontest import * +from restoretest import * +from rlisttest import * +from rorpitertest import * +from rpathtest import * +#from finaltest import * +from statictest import * +from timetest import * +from filelisttest import * +from setconnectionstest import * + +if __name__ == "__main__": + unittest.main() + diff --git a/rdiff-backup/testing/timetest.py b/rdiff-backup/testing/timetest.py new file mode 100644 index 0000000..f7a6fcd --- /dev/null +++ b/rdiff-backup/testing/timetest.py @@ -0,0 +1,71 @@ +import unittest +execfile("commontest.py") +rbexec("highlevel.py") + +class TimeTest(unittest.TestCase): + def testConversion(self): + """test timetostring and stringtotime""" + Time.setcurtime() + assert type(Time.curtime) is types.FloatType + assert type(Time.curtimestr) is types.StringType + assert (Time.cmp(int(Time.curtime), Time.curtimestr) == 0 or + Time.cmp(int(Time.curtime) + 1, Time.curtimestr) == 0) + time.sleep(1.05) + assert Time.cmp(time.time(), Time.curtime) == 1 + assert Time.cmp(Time.timetostring(time.time()), Time.curtimestr) == 1 + + def testConversion_separator(self): + """Same as testConversion, but change time Separator""" + Globals.time_separator = "_" + self.testConversion() + Globals.time_separator = ":" + + def testCmp(self): + """Test time comparisons""" + cmp = Time.cmp + assert cmp(1,2) == -1 + assert cmp(2,2) == 0 + assert cmp(5,1) == 1 + assert cmp("2001-09-01T21:49:04Z", "2001-08-01T21:49:04Z") == 1 + assert cmp("2001-09-01T04:49:04+03:23", "2001-09-01T21:49:04Z") == -1 + assert cmp("2001-09-01T12:00:00Z", "2001-09-01T04:00:00-08:00") == 0 + assert cmp("2001-09-01T12:00:00-08:00", + "2001-09-01T12:00:00-07:00") == 1 + + def testCmp_separator(self): + """Like testCmp but with new separator""" + Globals.time_separator = "_" + cmp = Time.cmp + assert cmp(1,2) == -1 + assert cmp(2,2) == 0 + assert cmp(5,1) == 1 + assert cmp("2001-09-01T21_49_04Z", "2001-08-01T21_49_04Z") == 1 + assert cmp("2001-09-01T04_49_04+03_23", "2001-09-01T21_49_04Z") == -1 + assert cmp("2001-09-01T12_00_00Z", "2001-09-01T04_00_00-08_00") == 0 + assert cmp("2001-09-01T12_00_00-08_00", + "2001-09-01T12_00_00-07_00") == 1 + Globals.time_separator = ":" + + def testStringtotime(self): + """Test converting string to time""" + timesec = int(time.time()) + assert timesec == int(Time.stringtotime(Time.timetostring(timesec))) + assert not Time.stringtotime("2001-18-83T03:03:03Z") + assert not Time.stringtotime("2001-01-23L03:03:03L") + assert not Time.stringtotime("2001_01_23T03:03:03Z") + + def testIntervals(self): + """Test converting strings to intervals""" + i2s = Time.intstringtoseconds + for s in ["32", "", "d", "231I", "MM", "s", "-2h"]: + try: i2s(s) + except TimeException: pass + else: assert 0, s + assert i2s("7D") == 7*86400 + assert i2s("232s") == 232 + assert i2s("2M") == 2*30*86400 + assert i2s("400m") == 400*60 + assert i2s("1Y") == 365*86400 + assert i2s("30h") == 30*60*60 + +if __name__ == '__main__': unittest.main() -- cgit v1.2.1