summaryrefslogtreecommitdiff
path: root/tools/examples/svnshell.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/examples/svnshell.py')
-rwxr-xr-xtools/examples/svnshell.py367
1 files changed, 367 insertions, 0 deletions
diff --git a/tools/examples/svnshell.py b/tools/examples/svnshell.py
new file mode 100755
index 0000000..9c67af4
--- /dev/null
+++ b/tools/examples/svnshell.py
@@ -0,0 +1,367 @@
+#!/usr/bin/env python
+#
+# svnshell.py : a Python-based shell interface for cruising 'round in
+# the filesystem.
+#
+######################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+######################################################################
+#
+
+import sys
+import time
+import re
+from cmd import Cmd
+from random import randint
+from svn import fs, core, repos
+
+
+class SVNShell(Cmd):
+ def __init__(self, path):
+ """initialize an SVNShell object"""
+ Cmd.__init__(self)
+ path = core.svn_path_canonicalize(path)
+ self.fs_ptr = repos.fs(repos.open(path))
+ self.is_rev = 1
+ self.rev = fs.youngest_rev(self.fs_ptr)
+ self.txn = None
+ self.root = fs.revision_root(self.fs_ptr, self.rev)
+ self.path = "/"
+ self._setup_prompt()
+ self.cmdloop()
+
+ def precmd(self, line):
+ if line == "EOF":
+ # Ctrl-D is a command without a newline. Print a newline, so the next
+ # shell prompt is not on the same line as the last svnshell prompt.
+ print("")
+ return "exit"
+ return line
+
+ def postcmd(self, stop, line):
+ self._setup_prompt()
+
+ _errors = ["Huh?",
+ "Whatchoo talkin' 'bout, Willis?",
+ "Say what?",
+ "Nope. Not gonna do it.",
+ "Ehh...I don't think so, chief."]
+
+ def default(self, line):
+ print(self._errors[randint(0, len(self._errors) - 1)])
+
+ def do_cat(self, arg):
+ """dump the contents of a file"""
+ if not len(arg):
+ print("You must supply a file path.")
+ return
+ catpath = self._parse_path(arg)
+ kind = fs.check_path(self.root, catpath)
+ if kind == core.svn_node_none:
+ print("Path '%s' does not exist." % catpath)
+ return
+ if kind == core.svn_node_dir:
+ print("Path '%s' is not a file." % catpath)
+ return
+ ### be nice to get some paging in here.
+ stream = fs.file_contents(self.root, catpath)
+ while True:
+ data = core.svn_stream_read(stream, core.SVN_STREAM_CHUNK_SIZE)
+ sys.stdout.write(data)
+ if len(data) < core.SVN_STREAM_CHUNK_SIZE:
+ break
+
+ def do_cd(self, arg):
+ """change directory"""
+ newpath = self._parse_path(arg)
+
+ # make sure that path actually exists in the filesystem as a directory
+ kind = fs.check_path(self.root, newpath)
+ if kind != core.svn_node_dir:
+ print("Path '%s' is not a valid filesystem directory." % newpath)
+ return
+ self.path = newpath
+
+ def do_ls(self, arg):
+ """list the contents of the current directory or provided path"""
+ parent = self.path
+ if not len(arg):
+ # no arg -- show a listing for the current directory.
+ entries = fs.dir_entries(self.root, self.path)
+ else:
+ # arg? show a listing of that path.
+ newpath = self._parse_path(arg)
+ kind = fs.check_path(self.root, newpath)
+ if kind == core.svn_node_dir:
+ parent = newpath
+ entries = fs.dir_entries(self.root, parent)
+ elif kind == core.svn_node_file:
+ parts = self._path_to_parts(newpath)
+ name = parts.pop(-1)
+ parent = self._parts_to_path(parts)
+ print(parent + ':' + name)
+ tmpentries = fs.dir_entries(self.root, parent)
+ if not tmpentries.get(name, None):
+ return
+ entries = {}
+ entries[name] = tmpentries[name]
+ else:
+ print("Path '%s' not found." % newpath)
+ return
+
+ keys = sorted(entries.keys())
+
+ print(" REV AUTHOR NODE-REV-ID SIZE DATE NAME")
+ print("----------------------------------------------------------------------------")
+
+ for entry in keys:
+ fullpath = parent + '/' + entry
+ size = ''
+ is_dir = fs.is_dir(self.root, fullpath)
+ if is_dir:
+ name = entry + '/'
+ else:
+ size = str(fs.file_length(self.root, fullpath))
+ name = entry
+ node_id = fs.unparse_id(entries[entry].id)
+ created_rev = fs.node_created_rev(self.root, fullpath)
+ author = fs.revision_prop(self.fs_ptr, created_rev,
+ core.SVN_PROP_REVISION_AUTHOR)
+ if not author:
+ author = ""
+ date = fs.revision_prop(self.fs_ptr, created_rev,
+ core.SVN_PROP_REVISION_DATE)
+ if not date:
+ date = ""
+ else:
+ date = self._format_date(date)
+
+ print("%6s %8s %12s %8s %12s %s" % (created_rev, author[:8],
+ node_id, size, date, name))
+
+ def do_lstxns(self, arg):
+ """list the transactions available for browsing"""
+ txns = sorted(fs.list_transactions(self.fs_ptr))
+ counter = 0
+ for txn in txns:
+ counter = counter + 1
+ sys.stdout.write("%8s " % txn)
+ if counter == 6:
+ print("")
+ counter = 0
+ print("")
+
+ def do_pcat(self, arg):
+ """list the properties of a path"""
+ catpath = self.path
+ if len(arg):
+ catpath = self._parse_path(arg)
+ kind = fs.check_path(self.root, catpath)
+ if kind == core.svn_node_none:
+ print("Path '%s' does not exist." % catpath)
+ return
+ plist = fs.node_proplist(self.root, catpath)
+ if not plist:
+ return
+ for pkey, pval in plist.items():
+ print('K ' + str(len(pkey)))
+ print(pkey)
+ print('P ' + str(len(pval)))
+ print(pval)
+ print('PROPS-END')
+
+ def do_setrev(self, arg):
+ """set the current revision to view"""
+ try:
+ if arg.lower() == 'head':
+ rev = fs.youngest_rev(self.fs_ptr)
+ else:
+ rev = int(arg)
+ newroot = fs.revision_root(self.fs_ptr, rev)
+ except:
+ print("Error setting the revision to '" + arg + "'.")
+ return
+ fs.close_root(self.root)
+ self.root = newroot
+ self.rev = rev
+ self.is_rev = 1
+ self._do_path_landing()
+
+ def do_settxn(self, arg):
+ """set the current transaction to view"""
+ try:
+ txnobj = fs.open_txn(self.fs_ptr, arg)
+ newroot = fs.txn_root(txnobj)
+ except:
+ print("Error setting the transaction to '" + arg + "'.")
+ return
+ fs.close_root(self.root)
+ self.root = newroot
+ self.txn = arg
+ self.is_rev = 0
+ self._do_path_landing()
+
+ def do_youngest(self, arg):
+ """list the youngest revision available for browsing"""
+ rev = fs.youngest_rev(self.fs_ptr)
+ print(rev)
+
+ def do_exit(self, arg):
+ sys.exit(0)
+
+ def _path_to_parts(self, path):
+ return [_f for _f in path.split('/') if _f]
+
+ def _parts_to_path(self, parts):
+ return '/' + '/'.join(parts)
+
+ def _parse_path(self, path):
+ # cleanup leading, trailing, and duplicate '/' characters
+ newpath = self._parts_to_path(self._path_to_parts(path))
+
+ # if PATH is absolute, use it, else append it to the existing path.
+ if path.startswith('/') or self.path == '/':
+ newpath = '/' + newpath
+ else:
+ newpath = self.path + '/' + newpath
+
+ # cleanup '.' and '..'
+ parts = self._path_to_parts(newpath)
+ finalparts = []
+ for part in parts:
+ if part == '.':
+ pass
+ elif part == '..':
+ if len(finalparts) != 0:
+ finalparts.pop(-1)
+ else:
+ finalparts.append(part)
+
+ # finally, return the calculated path
+ return self._parts_to_path(finalparts)
+
+ def _format_date(self, date):
+ date = core.svn_time_from_cstring(date)
+ date = time.asctime(time.localtime(date / 1000000))
+ return date[4:-8]
+
+ def _do_path_landing(self):
+ """try to land on self.path as a directory in root, failing up to '/'"""
+ not_found = 1
+ newpath = self.path
+ while not_found:
+ kind = fs.check_path(self.root, newpath)
+ if kind == core.svn_node_dir:
+ not_found = 0
+ else:
+ parts = self._path_to_parts(newpath)
+ parts.pop(-1)
+ newpath = self._parts_to_path(parts)
+ self.path = newpath
+
+ def _setup_prompt(self):
+ """present the prompt and handle the user's input"""
+ if self.is_rev:
+ self.prompt = "<rev: " + str(self.rev)
+ else:
+ self.prompt = "<txn: " + self.txn
+ self.prompt += " " + self.path + ">$ "
+
+ def _complete(self, text, line, begidx, endidx, limit_node_kind=None):
+ """Generic tab completer. Takes the 4 standard parameters passed to a
+ cmd.Cmd completer function, plus LIMIT_NODE_KIND, which should be a
+ svn.core.svn_node_foo constant to restrict the returned completions to, or
+ None for no limit. Catches and displays exceptions, because otherwise
+ they are silently ignored - which is quite frustrating when debugging!"""
+ try:
+ args = line.split()
+ if len(args) > 1:
+ arg = args[1]
+ else:
+ arg = ""
+ dirs = arg.split('/')
+ user_elem = dirs[-1]
+ user_dir = "/".join(dirs[:-1] + [''])
+
+ canon_dir = self._parse_path(user_dir)
+
+ entries = fs.dir_entries(self.root, canon_dir)
+ acceptable_completions = []
+ for name, dirent_t in entries.items():
+ if not name.startswith(user_elem):
+ continue
+ if limit_node_kind and dirent_t.kind != limit_node_kind:
+ continue
+ if dirent_t.kind == core.svn_node_dir:
+ name += '/'
+ acceptable_completions.append(name)
+ if limit_node_kind == core.svn_node_dir or not limit_node_kind:
+ if user_elem in ('.', '..'):
+ for extraname in ('.', '..'):
+ if extraname.startswith(user_elem):
+ acceptable_completions.append(extraname + '/')
+ return acceptable_completions
+ except:
+ ei = sys.exc_info()
+ sys.stderr.write("EXCEPTION WHILST COMPLETING\n")
+ import traceback
+ traceback.print_tb(ei[2])
+ sys.stderr.write("%s: %s\n" % (ei[0], ei[1]))
+ raise
+
+ def complete_cd(self, text, line, begidx, endidx):
+ return self._complete(text, line, begidx, endidx, core.svn_node_dir)
+
+ def complete_cat(self, text, line, begidx, endidx):
+ return self._complete(text, line, begidx, endidx, core.svn_node_file)
+
+ def complete_ls(self, text, line, begidx, endidx):
+ return self._complete(text, line, begidx, endidx)
+
+ def complete_pcat(self, text, line, begidx, endidx):
+ return self._complete(text, line, begidx, endidx)
+
+
+def _basename(path):
+ "Return the basename for a '/'-separated path."
+ idx = path.rfind('/')
+ if idx == -1:
+ return path
+ return path[idx+1:]
+
+
+def usage(exit):
+ if exit:
+ output = sys.stderr
+ else:
+ output = sys.stdout
+ output.write(
+ "usage: %s REPOS_PATH\n"
+ "\n"
+ "Once the program has started, type 'help' at the prompt for hints on\n"
+ "using the shell.\n" % sys.argv[0])
+ sys.exit(exit)
+
+def main():
+ if len(sys.argv) != 2:
+ usage(1)
+
+ SVNShell(sys.argv[1])
+
+if __name__ == '__main__':
+ main()