summaryrefslogtreecommitdiff
path: root/subversion/tests/cmdline/svntest/sandbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'subversion/tests/cmdline/svntest/sandbox.py')
-rw-r--r--subversion/tests/cmdline/svntest/sandbox.py253
1 files changed, 225 insertions, 28 deletions
diff --git a/subversion/tests/cmdline/svntest/sandbox.py b/subversion/tests/cmdline/svntest/sandbox.py
index ad67ac0..8451bb1 100644
--- a/subversion/tests/cmdline/svntest/sandbox.py
+++ b/subversion/tests/cmdline/svntest/sandbox.py
@@ -24,14 +24,68 @@
import os
import shutil
import copy
-import urllib
import logging
+import re
import svntest
logger = logging.getLogger()
+def make_mirror(sbox, source_prop_encoding=None):
+ """Make a mirror of the repository in SBOX.
+ """
+ # Set up the mirror repository.
+ dest_sbox = sbox.clone_dependent()
+ dest_sbox.build(create_wc=False, empty=True)
+ exit_code, output, errput = svntest.main.run_svnlook("uuid", sbox.repo_dir)
+ svntest.actions.run_and_verify_svnadmin2(None, None, 0,
+ 'setuuid', dest_sbox.repo_dir,
+ output[0][:-1])
+ svntest.actions.enable_revprop_changes(dest_sbox.repo_dir)
+
+ repo_url = sbox.repo_url
+ dest_repo_url = dest_sbox.repo_url
+
+ # Synchronize it.
+ args = (svntest.main.svnrdump_crosscheck_authentication,)
+ if source_prop_encoding:
+ args = args + ("--source-prop-encoding=" + source_prop_encoding,)
+ svntest.actions.run_and_verify_svnsync(svntest.verify.AnyOutput, [],
+ "initialize",
+ dest_repo_url, repo_url, *args)
+ svntest.actions.run_and_verify_svnsync(None, [],
+ "synchronize",
+ dest_repo_url, repo_url, *args)
+
+ return dest_sbox
+
+def verify_mirror(repo_url, repo_dir, expected_dumpfile):
+ """Compare the repository content at REPO_URL/REPO_DIR with that in
+ EXPECTED_DUMPFILE (which is a non-delta dump).
+ """
+ # Remove some SVNSync-specific housekeeping properties from the
+ # mirror repository in preparation for the comparison dump.
+ for prop_name in ("svn:sync-from-url", "svn:sync-from-uuid",
+ "svn:sync-last-merged-rev"):
+ svntest.actions.run_and_verify_svn(
+ None, [], "propdel", "--revprop", "-r", "0",
+ prop_name, repo_url)
+ # Create a dump file from the mirror repository.
+ dumpfile_s_n = svntest.actions.run_and_verify_dump(repo_dir)
+ # Compare the mirror's dumpfile, ignoring any expected differences:
+ # The original dumpfile in some cases lacks 'Text-content-sha1' headers;
+ # the mirror dump always has them -- ### Why?
+ svnsync_headers_always = re.compile("Text-content-sha1: ")
+ dumpfile_a_n_cmp = [l for l in expected_dumpfile
+ if not svnsync_headers_always.match(l)]
+ dumpfile_s_n_cmp = [l for l in dumpfile_s_n
+ if not svnsync_headers_always.match(l)]
+ svntest.verify.compare_dump_files(None, None,
+ dumpfile_a_n_cmp,
+ dumpfile_s_n_cmp)
+
+
class Sandbox:
"""Manages a sandbox (one or more repository/working copy pairs) for
a test to operate within."""
@@ -46,7 +100,9 @@ class Sandbox:
# This flag is set to True by build() and returned by is_built()
self._is_built = False
- def _set_name(self, name, read_only=False):
+ self.was_cwd = os.getcwd()
+
+ def _set_name(self, name, read_only=False, empty=False):
"""A convenience method for renaming a sandbox, useful when
working with multiple repositories in the same unit test."""
if not name is None:
@@ -54,32 +110,19 @@ class Sandbox:
self.read_only = read_only
self.wc_dir = os.path.join(svntest.main.general_wc_dir, self.name)
self.add_test_path(self.wc_dir)
- if not read_only:
+ if empty or not read_only: # use a local repo
self.repo_dir = os.path.join(svntest.main.general_repo_dir, self.name)
self.repo_url = (svntest.main.options.test_area_url + '/'
- + urllib.pathname2url(self.repo_dir))
+ + svntest.wc.svn_uri_quote(
+ self.repo_dir.replace(os.path.sep, '/')))
self.add_test_path(self.repo_dir)
else:
self.repo_dir = svntest.main.pristine_greek_repos_dir
self.repo_url = svntest.main.pristine_greek_repos_url
- ### TODO: Move this into to the build() method
- # For dav tests we need a single authz file which must be present,
- # so we recreate it each time a sandbox is created with some default
- # contents, making sure that an empty file is never present
if self.repo_url.startswith("http"):
- # this dir doesn't exist out of the box, so we may have to make it
- if not os.path.exists(svntest.main.work_dir):
- os.makedirs(svntest.main.work_dir)
self.authz_file = os.path.join(svntest.main.work_dir, "authz")
- tmp_authz_file = os.path.join(svntest.main.work_dir, "authz-" + self.name)
- open(tmp_authz_file, 'w').write("[/]\n* = rw\n")
- shutil.move(tmp_authz_file, self.authz_file)
self.groups_file = os.path.join(svntest.main.work_dir, "groups")
-
- # For svnserve tests we have a per-repository authz file, and it
- # doesn't need to be there in order for things to work, so we don't
- # have any default contents.
elif self.repo_url.startswith("svn"):
self.authz_file = os.path.join(self.repo_dir, "conf", "authz")
self.groups_file = os.path.join(self.repo_dir, "conf", "groups")
@@ -102,15 +145,32 @@ class Sandbox:
shutil.copytree(self.wc_dir, clone.wc_dir, symlinks=True)
return clone
- def build(self, name=None, create_wc=True, read_only=False,
+ def build(self, name=None, create_wc=True, read_only=False, empty=False,
minor_version=None):
"""Make a 'Greek Tree' repo (or refer to the central one if READ_ONLY),
+ or make an empty repo if EMPTY is true,
and check out a WC from it (unless CREATE_WC is false). Change the
sandbox's name to NAME. See actions.make_repo_and_wc() for details."""
- self._set_name(name, read_only)
- svntest.actions.make_repo_and_wc(self, create_wc, read_only, minor_version)
+ self._set_name(name, read_only, empty)
+ self._ensure_authz()
+ svntest.actions.make_repo_and_wc(self, create_wc, read_only, empty,
+ minor_version)
self._is_built = True
+ def _ensure_authz(self):
+ "make sure the repository is accessible"
+
+ if self.repo_url.startswith("http"):
+ default_authz = "[/]\n* = rw\n"
+
+ if (svntest.main.options.parallel == 0
+ and (not os.path.isfile(self.authz_file)
+ or open(self.authz_file,'r').read() != default_authz)):
+
+ tmp_authz_file = os.path.join(svntest.main.work_dir, "authz-" + self.name)
+ open(tmp_authz_file, 'w').write(default_authz)
+ shutil.move(tmp_authz_file, self.authz_file)
+
def authz_name(self, repo_dir=None):
"return this sandbox's name for use in an authz file"
repo_dir = repo_dir or self.repo_dir
@@ -135,7 +195,8 @@ class Sandbox:
path = (os.path.join(svntest.main.general_repo_dir, self.name)
+ '.' + suffix)
url = svntest.main.options.test_area_url + \
- '/' + urllib.pathname2url(path)
+ '/' + svntest.wc.svn_uri_quote(
+ path.replace(os.path.sep, '/'))
self.add_test_path(path, remove)
return path, url
@@ -185,7 +246,11 @@ class Sandbox:
of this sbox, or relative to OS-style path WC_DIR if supplied."""
if wc_dir is None:
wc_dir = self.wc_dir
- return os.path.join(wc_dir, svntest.wc.to_ospath(relpath))
+
+ if relpath == '':
+ return wc_dir
+ else:
+ return os.path.join(wc_dir, svntest.wc.to_ospath(relpath))
def ospaths(self, relpaths, wc_dir=None):
"""Return a list of RELPATHS but with each path converted to an OS-style
@@ -212,6 +277,12 @@ class Sandbox:
temporary and 'TEMP' or 'PERM',
parts[1])
+ def file_protocol_url(self):
+ """get a file:// url pointing to the repository"""
+ return svntest.main.file_scheme_prefix + \
+ svntest.wc.svn_uri_quote(
+ os.path.abspath(self.repo_dir).replace(os.path.sep, '/'))
+
def simple_update(self, target=None, revision='HEAD'):
"""Update the WC or TARGET.
TARGET is a relpath relative to the WC."""
@@ -317,15 +388,18 @@ class Sandbox:
raise Exception("Unexpected line '" + line + "' in proplist output" + str(out))
return props
- def simple_add_symlink(self, dest, target):
- """Create a symlink TARGET pointing to DEST and add it to subversion"""
+ def simple_symlink(self, dest, target):
+ """Create a symlink TARGET pointing to DEST"""
if svntest.main.is_posix_os():
os.symlink(dest, self.ospath(target))
else:
svntest.main.file_write(self.ospath(target), "link %s" % dest)
+
+ def simple_add_symlink(self, dest, target, add=True):
+ """Create a symlink TARGET pointing to DEST and add it to subversion"""
+ self.simple_symlink(dest, target)
self.simple_add(target)
- if not svntest.main.is_posix_os():
- # '*' is evaluated on Windows
+ if not svntest.main.is_posix_os(): # '*' is evaluated on Windows
self.simple_propset('svn:special', 'X', target)
def simple_add_text(self, text, *targets):
@@ -360,7 +434,7 @@ class Sandbox:
def simple_append(self, dest, contents, truncate=False):
"""Append CONTENTS to file DEST, optionally truncating it first.
DEST is a relpath relative to the WC."""
- open(self.ospath(dest), truncate and 'w' or 'a').write(contents)
+ open(self.ospath(dest), truncate and 'wb' or 'ab').write(contents)
def simple_lock(self, *targets):
"""Lock TARGETS in the WC.
@@ -369,6 +443,129 @@ class Sandbox:
targets = self.ospaths(targets)
svntest.main.run_svn(False, 'lock', *targets)
+ def youngest(self):
+ _, output, _ = svntest.actions.run_and_verify_svnlook(
+ svntest.verify.AnyOutput, [],
+ 'youngest', self.repo_dir)
+ youngest = int(output[0])
+ return youngest
+
+ def verify_repo(self):
+ """
+ """
+ svnrdump_headers_missing = re.compile(
+ "Text-content-sha1: .*|Text-copy-source-md5: .*|"
+ "Text-copy-source-sha1: .*|Text-delta-base-sha1: .*"
+ )
+ svnrdump_headers_always = re.compile(
+ "Prop-delta: .*"
+ )
+
+ dumpfile_a_n = svntest.actions.run_and_verify_dump(self.repo_dir,
+ deltas=False)
+ dumpfile_a_d = svntest.actions.run_and_verify_dump(self.repo_dir,
+ deltas=True)
+ dumpfile_r_d = svntest.actions.run_and_verify_svnrdump(
+ None, svntest.verify.AnyOutput, [], 0, 'dump', '-q', self.repo_url,
+ svntest.main.svnrdump_crosscheck_authentication)
+
+ # Compare the two deltas dumpfiles, ignoring expected differences
+ dumpfile_a_d_cmp = [l for l in dumpfile_a_d
+ if not svnrdump_headers_missing.match(l)
+ and not svnrdump_headers_always.match(l)]
+ dumpfile_r_d_cmp = [l for l in dumpfile_r_d
+ if not svnrdump_headers_always.match(l)]
+ # Ignore differences in number of blank lines between node records,
+ # as svnrdump puts 3 whereas svnadmin puts 2 after a replace-with-copy.
+ svntest.verify.compare_dump_files(None, None,
+ dumpfile_a_d_cmp,
+ dumpfile_r_d_cmp,
+ ignore_number_of_blank_lines=True)
+
+ # Try loading the dump files.
+ # For extra points, load each with the other tool:
+ # svnadmin dump | svnrdump load
+ # svnrdump dump | svnadmin load
+ repo_dir_a_n, repo_url_a_n = self.add_repo_path('load_a_n')
+ svntest.main.create_repos(repo_dir_a_n)
+ svntest.actions.enable_revprop_changes(repo_dir_a_n)
+ svntest.actions.run_and_verify_svnrdump(
+ dumpfile_a_n, svntest.verify.AnyOutput, [], 0, 'load', repo_url_a_n,
+ svntest.main.svnrdump_crosscheck_authentication)
+
+ repo_dir_a_d, repo_url_a_d = self.add_repo_path('load_a_d')
+ svntest.main.create_repos(repo_dir_a_d)
+ svntest.actions.enable_revprop_changes(repo_dir_a_d)
+ svntest.actions.run_and_verify_svnrdump(
+ dumpfile_a_d, svntest.verify.AnyOutput, [], 0, 'load', repo_url_a_d,
+ svntest.main.svnrdump_crosscheck_authentication)
+
+ repo_dir_r_d, repo_url_r_d = self.add_repo_path('load_r_d')
+ svntest.main.create_repos(repo_dir_r_d)
+ svntest.actions.run_and_verify_load(repo_dir_r_d, dumpfile_r_d)
+
+ # Dump the loaded repositories in the same way; expect exact equality
+ reloaded_dumpfile_a_n = svntest.actions.run_and_verify_dump(repo_dir_a_n)
+ reloaded_dumpfile_a_d = svntest.actions.run_and_verify_dump(repo_dir_a_d)
+ reloaded_dumpfile_r_d = svntest.actions.run_and_verify_dump(repo_dir_r_d)
+ svntest.verify.compare_dump_files(None, None,
+ reloaded_dumpfile_a_n,
+ reloaded_dumpfile_a_d,
+ ignore_uuid=True)
+ svntest.verify.compare_dump_files(None, None,
+ reloaded_dumpfile_a_d,
+ reloaded_dumpfile_r_d,
+ ignore_uuid=True)
+
+ # Run each dump through svndumpfilter and check for no further change.
+ for dumpfile in [dumpfile_a_n,
+ dumpfile_a_d,
+ dumpfile_r_d
+ ]:
+ ### No buffer size seems to work for update_tests-2. So skip that test?
+ ### (Its dumpfile size is ~360 KB non-delta, ~180 KB delta.)
+ if len(''.join(dumpfile)) > 100000:
+ continue
+
+ exit_code, dumpfile2, errput = svntest.main.run_command_stdin(
+ svntest.main.svndumpfilter_binary, None, -1, True,
+ dumpfile, '--quiet', 'include', '/')
+ assert not exit_code and not errput
+ # Ignore empty prop sections in the input file during comparison, as
+ # svndumpfilter strips them.
+ # Ignore differences in number of blank lines between node records,
+ # as svndumpfilter puts 3 instead of 2 after an add or delete record.
+ svntest.verify.compare_dump_files(None, None, dumpfile, dumpfile2,
+ expect_content_length_always=True,
+ ignore_empty_prop_sections=True,
+ ignore_number_of_blank_lines=True)
+
+ # Run the repository through 'svnsync' and check that this does not
+ # change the repository content. (Don't bother if it's already been
+ # created by svnsync.)
+ if "svn:sync-from-url\n" not in dumpfile_a_n:
+ dest_sbox = make_mirror(self)
+ verify_mirror(dest_sbox.repo_url, dest_sbox.repo_dir, dumpfile_a_n)
+
+ def verify(self, skip_cross_check=False):
+ """Do additional testing that should hold for any sandbox, such as
+ verifying that the repository can be dumped.
+ """
+ if (not skip_cross_check
+ and svntest.main.tests_verify_dump_load_cross_check()):
+ if self.is_built() and not self.read_only:
+ # verify that we can in fact dump the repo
+ # (except for the few tests that deliberately corrupt the repo)
+ os.chdir(self.was_cwd)
+ if os.path.exists(self.repo_dir):
+ logger.info("VERIFY: running dump/load cross-check")
+ self.verify_repo()
+ else:
+ logger.info("VERIFY: WARNING: skipping dump/load cross-check:"
+ " is-built=%s, read-only=%s"
+ % (self.is_built() and "true" or "false",
+ self.read_only and "true" or "false"))
+ pass
def is_url(target):
return (target.startswith('^/')