From 55084970e49c839cee9b2c7337f2905bc7950391 Mon Sep 17 00:00:00 2001 From: Zearin Date: Fri, 7 Oct 2011 12:55:49 -0400 Subject: Moved code from cmd2.py into errors.py, parsers.py, support.py, and tests.py. Also marked items for future tasks, tweaked docstrings, tweaked code to make repetitive statements read easier, and grouped all `do_` methods together inside the Cmd class. --- cmd2.py | 1455 ++++++++++++++++++++++-------------------------------------- errors.py | 132 +++++- parsers.py | 59 ++- support.py | 356 ++++++++++++++- tests.py | 115 ++++- 5 files changed, 1189 insertions(+), 928 deletions(-) diff --git a/cmd2.py b/cmd2.py index 7bd364c..b2b9cd6 100755 --- a/cmd2.py +++ b/cmd2.py @@ -1,28 +1,30 @@ -"""Variant on standard library's cmd with extra features. - -To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you -were using the standard library's cmd, while enjoying the extra features. - -Searchable command history (commands: "hi", "li", "run") -Load commands from file, save to file, edit commands in file -Multi-line commands -Case-insensitive commands -Special-character shortcut commands (beyond cmd's "@" and "!") -Settable environment parameters -Optional _onchange_{paramname} called when environment parameter changes -Parsing commands with `optparse` options (flags) -Redirection to file with >, >>; input from file with < -Easy transcript-based testing of applications (see example/example.py) -Bash-style ``select`` available - -Note that redirection with > and | will only work if `self.stdout.write()` -is used in place of `print`. The standard library's `cmd` module is -written to use `self.stdout.write()`, +'''Variant on standard library's cmd with extra features. + +Simply import cmd2.Cmd instead of cmd.Cmd. Use exactly as you would +use the standard library cmd, while enjoying the extra features. + + • Searchable command history (commands: "hi", "li", "run") + • Load commands from file, save to file, edit commands in file + • Multi-line commands + • Case-insensitive commands + • Special-character shortcut commands (beyond cmd's "@" and "!") + • Settable environment parameters + • Optional _onchange_{paramname} called when environment parameter changes + • Parsing commands with `optparse` options (flags) + • Redirection to file with >, >>; input from file with < + • Easy transcript-based testing of applications (see example/example.py) + • Bash-style ``select`` available + +Note: + Redirection with > and | will only work if `self.stdout.write()` + is used instead of `print`. The standard library's `cmd` module is + written to use `self.stdout.write()`. - Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com -mercurial repository at http://www.assembla.com/wiki/show/python-cmd2 -""" +Mercurial repository: + http://www.assembla.com/wiki/show/python-cmd2 +''' import cmd @@ -40,385 +42,130 @@ import glob import traceback import platform import copy -from code import InteractiveConsole, InteractiveInterpreter -from optparse import make_option +from code import InteractiveConsole, InteractiveInterpreter +from optparse import make_option import pyparsing -__version__ = '0.6.4' -if sys.version_info[0] == 2: - pyparsing.ParserElement.enablePackrat() +# Local imports +from . import commands, \ + errors, \ + parsers, \ + support, \ + tests +__version__ = '0.7.0' # Refactoring constitutes a version bump...right? +if sys.version_info[0] == 2: + pyparsing.ParserElement.enablePackrat() # Packrat is causing Python3 errors that I don't understand. -# +# # > /usr/local/Cellar/python3/3.2/lib/python3.2/site-packages/pyparsing-1.5.6-py3.2.egg/pyparsing.py(999)scanString() # -> nextLoc,tokens = parseFn( instring, preloc, callPreParse=False ) # (Pdb) n # NameError: global name 'exc' is not defined -# +# # (Pdb) parseFn # -# +# # Bug report filed: https://sourceforge.net/tracker/?func=detail&atid=617311&aid=3381439&group_id=97203 -class OptionParser(optparse.OptionParser): - def exit(self, status=0, msg=None): - self.values._exit = True - if msg: - print (msg) - - def print_help(self, *args, **kwargs): - try: - print (self._func.__doc__) - except AttributeError: - pass - optparse.OptionParser.print_help(self, *args, **kwargs) - - def error(self, msg): - """error(msg : string) - - Print a usage message incorporating 'msg' to stderr and exit. - If you override this in a subclass, it should not return -- it - should either exit or raise an exception. - """ - raise optparse.OptParseError(msg) - -def remaining_args(oldArgs, newArgList): - ''' - Preserves the spacing originally in the argument after - the removal of options. - - >>> remaining_args('-f bar bar cow', ['bar', 'cow']) - 'bar cow' - ''' - pattern = '\s+'.join(re.escape(a) for a in newArgList) + '\s*$' - matchObj = re.search(pattern, oldArgs) - return oldArgs[matchObj.start():] - -def _attr_get_(obj, attr): - '''Returns an attribute's value, or None (no error) if undefined. - Analagous to .get() for dictionaries. Useful when checking for - value of options that may not have been defined on a given - method.''' - try: - return getattr(obj, attr) - except AttributeError: - return None - -optparse.Values.get = _attr_get_ - -options_defined = [] # used to distinguish --options from SQL-style --comments - -def options(option_list, arg_desc="arg"): - '''Used as a decorator and passed a list of optparse-style options, - alters a cmd2 method to populate its ``opts`` argument from its - raw text argument. - - Example: transform - def do_something(self, arg): - - into - @options([make_option('-q', '--quick', action="store_true", - help="Makes things fast")], - "source dest") - def do_something(self, arg, opts): - if opts.quick: - self.fast_button = True - ''' - if not isinstance(option_list, list): - option_list = [option_list] - for opt in option_list: - options_defined.append(pyparsing.Literal(opt.get_opt_string())) - def option_setup(func): - optionParser = OptionParser() - for opt in option_list: - optionParser.add_option(opt) - optionParser.set_usage("%s [options] %s" % (func.__name__[3:], arg_desc)) - optionParser._func = func - def new_func(instance, arg): - try: - opts, newArgList = optionParser.parse_args(arg.split()) - # Must find the remaining args in the original argument list, but - # mustn't include the command itself - #if hasattr(arg, 'parsed') and newArgList[0] == arg.parsed.command: - # newArgList = newArgList[1:] - newArgs = remaining_args(arg, newArgList) - if isinstance(arg, ParsedString): - arg = arg.with_args_replaced(newArgs) - else: - arg = newArgs - except optparse.OptParseError, e: - print (e) - optionParser.print_help() - return - if hasattr(opts, '_exit'): - return None - result = func(instance, arg, opts) - return result - new_func.__doc__ = '%s\n%s' % (func.__doc__, optionParser.format_help()) - return new_func - return option_setup - -class PasteBufferError(EnvironmentError): - if sys.platform[:3] == 'win': - errmsg = """Redirecting to or from paste buffer requires pywin32 -to be installed on operating system. -Download from http://sourceforge.net/projects/pywin32/""" - elif sys.platform[:3] == 'dar': - # Use built in pbcopy on Mac OSX - pass - else: - errmsg = """Redirecting to or from paste buffer requires xclip -to be installed on operating system. -On Debian/Ubuntu, 'sudo apt-get install xclip' will install it.""" - def __init__(self): - Exception.__init__(self, self.errmsg) - -pastebufferr = """Redirecting to or from paste buffer requires %s -to be installed on operating system. -%s""" - -if subprocess.mswindows: - try: - import win32clipboard - def get_paste_buffer(): - win32clipboard.OpenClipboard(0) - try: - result = win32clipboard.GetClipboardData() - except TypeError: - result = '' #non-text - win32clipboard.CloseClipboard() - return result - def write_to_paste_buffer(txt): - win32clipboard.OpenClipboard(0) - win32clipboard.EmptyClipboard() - win32clipboard.SetClipboardText(txt) - win32clipboard.CloseClipboard() - except ImportError: - def get_paste_buffer(*args): - raise OSError, pastebufferr % ('pywin32', 'Download from http://sourceforge.net/projects/pywin32/') - write_to_paste_buffer = get_paste_buffer -elif sys.platform == 'darwin': - can_clip = False - try: - # test for pbcopy - AFAIK, should always be installed on MacOS - subprocess.check_call('pbcopy -help', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) - can_clip = True - except (subprocess.CalledProcessError, OSError, IOError): - pass - if can_clip: - def get_paste_buffer(): - pbcopyproc = subprocess.Popen('pbcopy -help', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) - return pbcopyproc.stdout.read() - def write_to_paste_buffer(txt): - pbcopyproc = subprocess.Popen('pbcopy', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) - pbcopyproc.communicate(txt.encode()) - else: - def get_paste_buffer(*args): - raise OSError, pastebufferr % ('pbcopy', 'On MacOS X - error should not occur - part of the default installation') - write_to_paste_buffer = get_paste_buffer -else: - can_clip = False - try: - subprocess.check_call('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) - can_clip = True - except AttributeError: # check_call not defined, Python < 2.5 - try: - teststring = 'Testing for presence of xclip.' - xclipproc = subprocess.Popen('xclip -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) - xclipproc.stdin.write(teststring) - xclipproc.stdin.close() - xclipproc = subprocess.Popen('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) - if xclipproc.stdout.read() == teststring: - can_clip = True - except Exception: # hate a bare Exception call, but exception classes vary too much b/t stdlib versions - pass - except Exception: - pass # something went wrong with xclip and we cannot use it - if can_clip: - def get_paste_buffer(): - xclipproc = subprocess.Popen('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) - return xclipproc.stdout.read() - def write_to_paste_buffer(txt): - xclipproc = subprocess.Popen('xclip -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) - xclipproc.stdin.write(txt.encode()) - xclipproc.stdin.close() - # but we want it in both the "primary" and "mouse" clipboards - xclipproc = subprocess.Popen('xclip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) - xclipproc.stdin.write(txt.encode()) - xclipproc.stdin.close() - else: - def get_paste_buffer(*args): - raise OSError, pastebufferr % ('xclip', 'On Debian/Ubuntu, install with "sudo apt-get install xclip"') - write_to_paste_buffer = get_paste_buffer - -pyparsing.ParserElement.setDefaultWhitespaceChars(' \t') - -class ParsedString(str): - def full_parsed_statement(self): - new = ParsedString('%s %s' % (self.parsed.command, self.parsed.args)) - new.parsed = self.parsed - new.parser = self.parser - return new - def with_args_replaced(self, newargs): - new = ParsedString(newargs) - new.parsed = self.parsed - new.parser = self.parser - new.parsed['args'] = newargs - new.parsed.statement['args'] = newargs - return new - -class StubbornDict(dict): - '''Dictionary that tolerates many input formats. - Create it with stubbornDict(arg) factory function. - - >>> d = StubbornDict(large='gross', small='klein') - >>> sorted(d.items()) - [('large', 'gross'), ('small', 'klein')] - >>> d.append(['plain', ' plaid']) - >>> sorted(d.items()) - [('large', 'gross'), ('plaid', ''), ('plain', ''), ('small', 'klein')] - >>> d += ' girl Frauelein, Maedchen\\n\\n shoe schuh' - >>> sorted(d.items()) - [('girl', 'Frauelein, Maedchen'), ('large', 'gross'), ('plaid', ''), ('plain', ''), ('shoe', 'schuh'), ('small', 'klein')] - ''' - def update(self, arg): - dict.update(self, StubbornDict.to_dict(arg)) - append = update - def __iadd__(self, arg): - self.update(arg) - return self - def __add__(self, arg): - selfcopy = copy.copy(self) - selfcopy.update(stubbornDict(arg)) - return selfcopy - def __radd__(self, arg): - selfcopy = copy.copy(self) - selfcopy.update(stubbornDict(arg)) - return selfcopy - - @classmethod - def to_dict(cls, arg): - 'Generates dictionary from string or list of strings' - if hasattr(arg, 'splitlines'): - arg = arg.splitlines() - if hasattr(arg, '__reversed__'): - result = {} - for a in arg: - a = a.strip() - if a: - key_val = a.split(None, 1) - key = key_val[0] - if len(key_val) > 1: - val = key_val[1] - else: - val = '' - result[key] = val - else: - result = arg - return result - -def stubbornDict(*arg, **kwarg): - ''' - >>> sorted(stubbornDict('cow a bovine\\nhorse an equine').items()) - [('cow', 'a bovine'), ('horse', 'an equine')] - >>> sorted(stubbornDict(['badger', 'porcupine a poky creature']).items()) - [('badger', ''), ('porcupine', 'a poky creature')] - >>> sorted(stubbornDict(turtle='has shell', frog='jumpy').items()) - [('frog', 'jumpy'), ('turtle', 'has shell')] - ''' - result = {} - for a in arg: - result.update(StubbornDict.to_dict(a)) - result.update(kwarg) - return StubbornDict(result) - -def replace_with_file_contents(fname): - if fname: - try: - result = open(os.path.expanduser(fname[0])).read() - except IOError: - result = '< %s' % fname[0] # wasn't a file after all - else: - result = get_paste_buffer() - return result - -class EmbeddedConsoleExit(SystemExit): - pass - -class EmptyStatement(Exception): - pass - -def ljust(x, width, fillchar=' '): - 'analogous to str.ljust, but works for lists' - if hasattr(x, 'ljust'): - return x.ljust(width, fillchar) - else: - if len(x) < width: - x = (x + [fillchar] * width)[:width] - return x - class Cmd(cmd.Cmd): - echo = False - case_insensitive = True # Commands recognized regardless of case - continuation_prompt = '> ' - timing = False # Prints elapsed time for each command + echo = False + case_insensitive = True # Commands recognized regardless of case + continuation_prompt = '> ' + timing = False # Prints elapsed time for each command # make sure your terminators are not in legalChars! - legalChars = u'!#$%.:?@_' + pyparsing.alphanums + pyparsing.alphas8bit - shortcuts = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'} - excludeFromHistory = '''run r list l history hi ed edit li eof'''.split() - default_to_shell = False - noSpecialParse = 'set ed edit exit'.split() - defaultExtension = 'txt' # For ``save``, ``load``, etc. - default_file_name = 'command.txt' # For ``save``, ``load``, etc. - abbrev = True # Abbreviated commands recognized - current_script_dir = None - reserved_words = [] - feedback_to_output = False # Do include nonessentials in >, | output - quiet = False # Do not suppress nonessential output - debug = False - locals_in_py = True - kept_state = None - redirector = '>' # for sending output to file + legalChars = u'!#$%.:?@_' + pyparsing.alphanums + pyparsing.alphas8bit + shortcuts = { '?': 'help', + '!': 'shell', + '@': 'load', + '@@': '_relative_load' } + + excludeFromHistory = '''run r list l history hi ed edit li eof'''.split() + default_to_shell = False + noSpecialParse = 'set ed edit exit'.split() + defaultExtension = 'txt' # For ``save``, ``load``, etc. + default_file_name = 'command.txt' # For ``save``, ``load``, etc. + abbrev = True # Abbreviated commands recognized + current_script_dir = None + reserved_words = [] + feedback_to_output = False # Include nonessentials in >, | output + quiet = False # Do not suppress nonessential output + debug = False + locals_in_py = True + kept_state = None + redirector = '>' # For redirecting output to a file + settable = stubbornDict(''' prompt colors Colorized output (*nix only) continuation_prompt On 2nd+ line of input debug Show full error stack on error default_file_name for ``save``, ``load``, etc. - editor Program used by ``edit`` + editor Program used by ``edit`` case_insensitive upper- and lower-case both OK - feedback_to_output include nonessentials in `|`, `>` results + feedback_to_output include nonessentials in `|`, `>` results quiet Don't print nonessential feedback echo Echo command issued into output timing Report execution times abbrev Accept abbreviated commands ''') - + + + def __init__(self, *args, **kwargs): + # @FIXME + # + # Add docstring describing what gets __init__'ed, + # and what Cmd2 users might want to do with + # __init__ in their own Cmd2-based program. + # + cmd.Cmd.__init__(self, *args, **kwargs) + self.initial_stdout = sys.stdout + self.history = History() + self.pystate = {} + self.shortcuts = sorted(self.shortcuts.items(), reverse=True) + self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) + if fname.startswith('do_')] + self._init_parser() + + def poutput(self, msg): - '''Convenient shortcut for self.stdout.write(); adds newline if necessary.''' + ''' + Convenience shortcut for self.stdout.write(). Adds newline if necessary. + ''' if msg: self.stdout.write(msg) if msg[-1] != '\n': self.stdout.write('\n') + + def perror(self, errmsg, statement=None): if self.debug: traceback.print_exc() print (str(errmsg)) + + def pfeedback(self, msg): - """For printing nonessential feedback. Can be silenced with `quiet`. - Inclusion in redirected output is controlled by `feedback_to_output`.""" + ''' + Prints nonessential feedback. Can be silenced with `quiet`. + + Inclusion in redirected output is controlled by `feedback_to_output`. + ''' if not self.quiet: if self.feedback_to_output: self.poutput(msg) else: print (msg) - _STOP_AND_EXIT = True # distinguish end of script file from actual exit + + + _STOP_AND_EXIT = True # distinguish script file's end from actual exit _STOP_SCRIPT_NO_EXIT = -999 + editor = os.environ.get('EDITOR') + if not editor: if sys.platform[:3] == 'win': editor = 'notepad' @@ -427,72 +174,53 @@ class Cmd(cmd.Cmd): if subprocess.Popen(['which', editor], stdout=subprocess.PIPE).communicate()[0]: break - colorcodes = {'bold':{True:'\x1b[1m',False:'\x1b[22m'}, - 'cyan':{True:'\x1b[36m',False:'\x1b[39m'}, - 'blue':{True:'\x1b[34m',False:'\x1b[39m'}, - 'red':{True:'\x1b[31m',False:'\x1b[39m'}, - 'magenta':{True:'\x1b[35m',False:'\x1b[39m'}, - 'green':{True:'\x1b[32m',False:'\x1b[39m'}, - 'underline':{True:'\x1b[4m',False:'\x1b[24m'}} + colorcodes = { 'bold' :{True:'\x1b[1m' ,False:'\x1b[22m'}, + 'cyan' :{True:'\x1b[36m',False:'\x1b[39m'}, + 'blue' :{True:'\x1b[34m',False:'\x1b[39m'}, + 'red' :{True:'\x1b[31m',False:'\x1b[39m'}, + 'magenta' :{True:'\x1b[35m',False:'\x1b[39m'}, + 'green' :{True:'\x1b[32m',False:'\x1b[39m'}, + 'underline' :{True:'\x1b[4m' ,False:'\x1b[24m'} + } + colors = (platform.system() != 'Windows') + + def colorize(self, val, color): - '''Given a string (``val``), returns that string wrapped in UNIX-style - special characters that turn on (and then off) text color and style. - If the ``colors`` environment paramter is ``False``, or the application - is running on Windows, will return ``val`` unchanged. - ``color`` should be one of the supported strings (or styles): - red/blue/green/cyan/magenta, bold, underline''' + ''' + Wraps the provided string in UNIX-style special characters that + turn on (and then off) text color and style. + + The string is returned unchanged if the ``colors`` environment variable + is ``False`` or the application is running on Windows. + + ``color`` should be one of the supported strings (or styles): + red/blue/green/cyan/magenta, bold, underline + ''' if self.colors and (self.stdout == self.initial_stdout): return self.colorcodes[color][True] + val + self.colorcodes[color][False] return val - def do_cmdenvironment(self, args): - '''Summary report of interactive parameters.''' - self.stdout.write(""" - Commands are %(casesensitive)scase-sensitive. - Commands may be terminated with: %(terminators)s - Settable parameters: %(settable)s\n""" % \ - { 'casesensitive': (self.case_insensitive and 'not ') or '', - 'terminators': str(self.terminators), - 'settable': ' '.join(self.settable) - }) - - def do_help(self, arg): - if arg: - funcname = self.func_named(arg) - if funcname: - fn = getattr(self, funcname) - try: - fn.optionParser.print_help(file=self.stdout) - except AttributeError: - cmd.Cmd.do_help(self, funcname[3:]) - else: - cmd.Cmd.do_help(self, arg) - - def __init__(self, *args, **kwargs): - cmd.Cmd.__init__(self, *args, **kwargs) - self.initial_stdout = sys.stdout - self.history = History() - self.pystate = {} - self.shortcuts = sorted(self.shortcuts.items(), reverse=True) - self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) - if fname.startswith('do_')] - self._init_parser() - - def do_shortcuts(self, args): - """Lists single-key shortcuts available.""" - result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts)) - self.stdout.write("Single-key shortcuts for other commands:\n%s\n" % (result)) - prefixParser = pyparsing.Empty() - commentGrammars = pyparsing.Or([pyparsing.pythonStyleComment, pyparsing.cStyleComment]) + prefixParser = pyparsing.Empty() + commentGrammars = pyparsing.Or([pyparsing.pythonStyleComment, pyparsing.cStyleComment]) commentGrammars.addParseAction(lambda x: '') - commentInProgress = pyparsing.Literal('/*') + pyparsing.SkipTo( - pyparsing.stringEnd ^ '*/') - terminators = [';'] - blankLinesAllowed = False - multilineCommands = [] - + commentInProgress = pyparsing.Literal('/*') + pyparsing.SkipTo(pyparsing.stringEnd ^ '*/') + terminators = [';'] + blankLinesAllowed = False + multilineCommands = [] + + + # @TODO + # Could this be moved to parser.py? + # + # That WOULD move it outside this class. But this is an + # unbelievably huge chunk block of code dedicated to parsing. + # My gut instinct is to keep such large codeblocks separated according + # to their role. + # + # So, that would mean this method lives with other parsing stuff + # in parsers.py. def _init_parser(self): r''' >>> c = Cmd() @@ -502,21 +230,21 @@ class Cmd(cmd.Cmd): >>> print (c.parser.parseString('').dump()) [] >>> print (c.parser.parseString('').dump()) - [] + [] >>> print (c.parser.parseString('/* empty command */').dump()) - [] + [] >>> print (c.parser.parseString('plainword').dump()) ['plainword', ''] - command: plainword - statement: ['plainword', ''] - - command: plainword + - command: plainword >>> print (c.parser.parseString('termbare;').dump()) ['termbare', '', ';', ''] - command: termbare - statement: ['termbare', '', ';'] - command: termbare - terminator: ; - - terminator: ; + - terminator: ; >>> print (c.parser.parseString('termbare; suffx').dump()) ['termbare', '', ';', 'suffx'] - command: termbare @@ -524,7 +252,7 @@ class Cmd(cmd.Cmd): - command: termbare - terminator: ; - suffix: suffx - - terminator: ; + - terminator: ; >>> print (c.parser.parseString('barecommand').dump()) ['barecommand', ''] - command: barecommand @@ -579,7 +307,7 @@ class Cmd(cmd.Cmd): - outputTo: afile.txt - statement: ['output', 'into'] - args: into - - command: output + - command: output >>> print (c.parser.parseString('output into;sufx | pipethrume plz > afile.txt').dump()) ['output', 'into', ';', 'sufx', '|', ' pipethrume plz', '>', 'afile.txt'] - args: into @@ -618,10 +346,10 @@ class Cmd(cmd.Cmd): - args: > inside - command: has - terminator: ; - - terminator: ; + - terminator: ; >>> print (c.parser.parseString('multiline has > inside an unfinished command').dump()) ['multiline', ' has > inside an unfinished command'] - - multilineCommand: multiline + - multilineCommand: multiline >>> print (c.parser.parseString('multiline has > inside;').dump()) ['multiline', 'has > inside', ';', ''] - args: has > inside @@ -630,7 +358,7 @@ class Cmd(cmd.Cmd): - args: has > inside - multilineCommand: multiline - terminator: ; - - terminator: ; + - terminator: ; >>> print (c.parser.parseString('multiline command /* with comment in progress;').dump()) ['multiline', ' command /* with comment in progress;'] - multilineCommand: multiline @@ -670,20 +398,26 @@ class Cmd(cmd.Cmd): - command: what ''' #outputParser = (pyparsing.Literal('>>') | (pyparsing.WordStart() + '>') | pyparsing.Regex('[^=]>'))('output') - outputParser = (pyparsing.Literal(self.redirector *2) | \ - (pyparsing.WordStart() + self.redirector) | \ - pyparsing.Regex('[^=]' + self.redirector))('output') - - terminatorParser = pyparsing.Or([(hasattr(t, 'parseString') and t) or pyparsing.Literal(t) for t in self.terminators])('terminator') - stringEnd = pyparsing.stringEnd ^ '\nEOF' + outputParser = (pyparsing.Literal(self.redirector *2) | \ + (pyparsing.WordStart() + self.redirector) | \ + pyparsing.Regex('[^=]' + self.redirector))('output') + + terminatorParser = pyparsing.Or([(hasattr(t, 'parseString') and t) or pyparsing.Literal(t) for t in self.terminators])('terminator') + stringEnd = pyparsing.stringEnd ^ '\nEOF' self.multilineCommand = pyparsing.Or([pyparsing.Keyword(c, caseless=self.case_insensitive) for c in self.multilineCommands])('multilineCommand') - oneLineCommand = (~self.multilineCommand + pyparsing.Word(self.legalChars))('command') - pipe = pyparsing.Keyword('|', identChars='|') + oneLineCommand = (~self.multilineCommand + pyparsing.Word(self.legalChars))('command') + pipe = pyparsing.Keyword('|', identChars='|') self.commentGrammars.ignore(pyparsing.quotedString).setParseAction(lambda x: '') - doNotParse = self.commentGrammars | self.commentInProgress | pyparsing.quotedString - afterElements = \ - pyparsing.Optional(pipe + pyparsing.SkipTo(outputParser ^ stringEnd, ignore=doNotParse)('pipeTo')) + \ - pyparsing.Optional(outputParser + pyparsing.SkipTo(stringEnd, ignore=doNotParse).setParseAction(lambda x: x[0].strip())('outputTo')) + doNotParse = self.commentGrammars | self.commentInProgress | pyparsing.quotedString + afterElements = \ + pyparsing.Optional( \ + pipe + \ + pyparsing.SkipTo(outputParser ^ stringEnd, ignore=doNotParse)('pipeTo') + ) + \ + pyparsing.Optional( \ + outputParser + \ + pyparsing.SkipTo(stringEnd, ignore=doNotParse).setParseAction(lambda x: x[0].strip())('outputTo') + ) if self.case_insensitive: self.multilineCommand.setParseAction(lambda x: x[0].lower()) oneLineCommand.setParseAction(lambda x: x[0].lower()) @@ -693,39 +427,48 @@ class Cmd(cmd.Cmd): self.blankLineTerminator = (pyparsing.lineEnd + pyparsing.lineEnd)('terminator') self.blankLineTerminator.setResultsName('terminator') self.blankLineTerminationParser = ((self.multilineCommand ^ oneLineCommand) + pyparsing.SkipTo(self.blankLineTerminator, ignore=doNotParse).setParseAction(lambda x: x[0].strip())('args') + self.blankLineTerminator)('statement') - self.multilineParser = (((self.multilineCommand ^ oneLineCommand) + pyparsing.SkipTo(terminatorParser, ignore=doNotParse).setParseAction(lambda x: x[0].strip())('args') + terminatorParser)('statement') + + self.multilineParser = (((self.multilineCommand ^ oneLineCommand) + pyparsing.SkipTo(terminatorParser, ignore=doNotParse).setParseAction(lambda x: x[0].strip())('args') + terminatorParser)('statement') + pyparsing.SkipTo(outputParser ^ pipe ^ stringEnd, ignore=doNotParse).setParseAction(lambda x: x[0].strip())('suffix') + afterElements) self.multilineParser.ignore(self.commentInProgress) - self.singleLineParser = ((oneLineCommand + pyparsing.SkipTo(terminatorParser ^ stringEnd ^ pipe ^ outputParser, ignore=doNotParse).setParseAction(lambda x:x[0].strip())('args'))('statement') + + self.singleLineParser = ((oneLineCommand + pyparsing.SkipTo(terminatorParser ^ stringEnd ^ pipe ^ outputParser, ignore=doNotParse).setParseAction(lambda x:x[0].strip())('args'))('statement') + pyparsing.Optional(terminatorParser) + afterElements) - #self.multilineParser = self.multilineParser.setResultsName('multilineParser') - #self.singleLineParser = self.singleLineParser.setResultsName('singleLineParser') + #self.multilineParser = self.multilineParser.setResultsName('multilineParser') + #self.singleLineParser = self.singleLineParser.setResultsName('singleLineParser') self.blankLineTerminationParser = self.blankLineTerminationParser.setResultsName('statement') self.parser = self.prefixParser + ( stringEnd | self.multilineParser | self.singleLineParser | - self.blankLineTerminationParser | + self.blankLineTerminationParser | self.multilineCommand + pyparsing.SkipTo(stringEnd, ignore=doNotParse) ) self.parser.ignore(self.commentGrammars) - - inputMark = pyparsing.Literal('<') + + inputMark = pyparsing.Literal('<') inputMark.setParseAction(lambda x: '') - fileName = pyparsing.Word(self.legalChars + '/\\') - inputFrom = fileName('inputFrom') + fileName = pyparsing.Word(self.legalChars + '/\\') + inputFrom = fileName('inputFrom') inputFrom.setParseAction(replace_with_file_contents) - # a not-entirely-satisfactory way of distinguishing < as in "import from" from < - # as in "lesser than" - self.inputParser = inputMark + pyparsing.Optional(inputFrom) + pyparsing.Optional('>') + \ - pyparsing.Optional(fileName) + (pyparsing.stringEnd | '|') - self.inputParser.ignore(self.commentInProgress) - + + # A not-entirely-satisfactory way of distinguishing < as in "import from" + # from < as in "lesser than" + self.inputParser = inputMark + \ + pyparsing.Optional(inputFrom) + \ + pyparsing.Optional('>') + \ + pyparsing.Optional(fileName) + \ + (pyparsing.stringEnd | '|') + + self.inputParser.ignore(self.commentInProgress) + + def preparse(self, raw, **kwargs): return raw + + def postparse(self, parseResult): return parseResult - + + def parsed(self, raw, **kwargs): if isinstance(raw, ParsedString): p = raw @@ -739,8 +482,8 @@ class Cmd(cmd.Cmd): s = s.replace(shortcut, expansion + ' ', 1) break result = self.parser.parseString(s) - result['raw'] = raw - result['command'] = result.multilineCommand or result.command + result['raw'] = raw + result['command'] = result.multilineCommand or result.command result = self.postparse(result) p = ParsedString(result.args) p.parsed = result @@ -748,13 +491,17 @@ class Cmd(cmd.Cmd): for (key, val) in kwargs.items(): p.parsed[key] = val return p - + + def postparsing_precmd(self, statement): stop = 0 return stop, statement + + def postparsing_postcmd(self, stop): return stop - + + def func_named(self, arg): result = None target = 'do_' + arg @@ -766,10 +513,15 @@ class Cmd(cmd.Cmd): if len(funcs) == 1: result = 'do_' + funcs[0] return result + + def onecmd_plus_hooks(self, line): # The outermost level of try/finally nesting can be condensed once # Python 2.4 support can be dropped. + # + # @TODO Do you think we can safely drop Python 2.4 support yet? :-) stop = 0 + try: try: statement = self.complete_statement(line) @@ -777,7 +529,7 @@ class Cmd(cmd.Cmd): if stop: return self.postparsing_postcmd(stop) if statement.parsed.command not in self.excludeFromHistory: - self.history.append(statement.parsed.raw) + self.history.append(statement.parsed.raw) try: self.redirect_output(statement) timestart = datetime.datetime.now() @@ -791,24 +543,29 @@ class Cmd(cmd.Cmd): except EmptyStatement: return 0 except Exception, e: - self.perror(str(e), statement) + self.perror(str(e), statement) finally: - return self.postparsing_postcmd(stop) + return self.postparsing_postcmd(stop) + + def complete_statement(self, line): - """Keep accepting lines of input until the command is complete.""" + ''' + Continue accepting lines of input until the command is complete. + ''' if (not line) or ( not pyparsing.Or(self.commentGrammars). setParseAction(lambda x: '').transformString(line)): raise EmptyStatement statement = self.parsed(line) while statement.parsed.multilineCommand and (statement.parsed.terminator == ''): - statement = '%s\n%s' % (statement.parsed.raw, - self.pseudo_raw_input(self.continuation_prompt)) + statement = '%s\n%s' % (statement.parsed.raw, + self.pseudo_raw_input(self.continuation_prompt)) statement = self.parsed(statement) if not statement.parsed.command: raise EmptyStatement return statement - + + def redirect_output(self, statement): if statement.parsed.pipeTo: self.kept_state = Statekeeper(self, ('stdout',)) @@ -818,18 +575,21 @@ class Cmd(cmd.Cmd): elif statement.parsed.output: if (not statement.parsed.outputTo) and (not can_clip): raise EnvironmentError('Cannot redirect to paste buffer; install ``xclip`` and re-run to enable') - self.kept_state = Statekeeper(self, ('stdout',)) + + self.kept_state = Statekeeper(self, ('stdout',)) self.kept_sys = Statekeeper(sys, ('stdout',)) + if statement.parsed.outputTo: mode = 'w' if statement.parsed.output == 2 * self.redirector: mode = 'a' - sys.stdout = self.stdout = open(os.path.expanduser(statement.parsed.outputTo), mode) + sys.stdout = self.stdout = open(os.path.expanduser(statement.parsed.outputTo), mode) else: sys.stdout = self.stdout = tempfile.TemporaryFile(mode="w+") if statement.parsed.output == '>>': self.stdout.write(get_paste_buffer()) - + + def restore_output(self, statement): if self.kept_state: if statement.parsed.output: @@ -837,37 +597,42 @@ class Cmd(cmd.Cmd): self.stdout.seek(0) write_to_paste_buffer(self.stdout.read()) elif statement.parsed.pipeTo: - for result in self.redirect.communicate(): - self.kept_state.stdout.write(result or '') + for result in self.redirect.communicate(): + self.kept_state.stdout.write(result or '') self.stdout.close() - self.kept_state.restore() + self.kept_state.restore() self.kept_sys.restore() - self.kept_state = None - + self.kept_state = None + + def onecmd(self, line): - """Interpret the argument as though it had been typed in response + ''' + Interpret the argument as though it had been typed in response to the prompt. - This may be overridden, but should not normally need to be; - see the precmd() and postcmd() methods for useful execution hooks. + This may be overridden, but shouldn't normally need to be. + See the precmd() and postcmd() methods for useful execution hooks. + The return value is a flag indicating whether interpretation of commands by the interpreter should stop. - - This (`cmd2`) version of `onecmd` already override's `cmd`'s `onecmd`. - """ + This (`cmd2`) version of `onecmd` already override's `cmd`'s `onecmd`. + ''' statement = self.parsed(line) - self.lastcmd = statement.parsed.raw + self.lastcmd = statement.parsed.raw funcname = self.func_named(statement.parsed.command) + if not funcname: return self._default(statement) try: func = getattr(self, funcname) except AttributeError: return self._default(statement) - stop = func(statement) - return stop - + + stop = func(statement) + return stop + + def _default(self, statement): arg = statement.full_parsed_statement() if self.default_to_shell: @@ -876,9 +641,11 @@ class Cmd(cmd.Cmd): return self.postparsing_postcmd(None) return self.postparsing_postcmd(self.default(arg)) + def pseudo_raw_input(self, prompt): - """copied from cmd's cmdloop; like raw_input, but accounts for changed stdin, stdout""" - + ''' + Copied from cmd's cmdloop. Similar to raw_input, but accounts for changed stdin/stdout. + ''' if self.use_rawinput: try: line = raw_input(prompt) @@ -892,19 +659,24 @@ class Cmd(cmd.Cmd): line = 'EOF' else: if line[-1] == '\n': # this was always true in Cmd - line = line[:-1] + line = line[:-1] return line - + + def _cmdloop(self, intro=None): - """Repeatedly issue a prompt, accept input, parse an initial prefix + ''' + Repeatedly issue a prompt, accept input, parse an initial prefix off the received input, and dispatch to action methods, passing them the remainder of the line as argument. - """ + ''' + + # An almost-perfect copy from Cmd. + # + # However, the `pseudo_raw_input` portion has been + # split out so that it can be called separately. - # An almost perfect copy from Cmd; however, the pseudo_raw_input portion - # has been split out so that it can be called separately - self.preloop() + if self.use_rawinput and self.completekey: try: import readline @@ -934,32 +706,28 @@ class Cmd(cmd.Cmd): import readline readline.set_completer(self.old_completer) except ImportError: - pass + pass return stop - def do_EOF(self, arg): - return self._STOP_SCRIPT_NO_EXIT # End of script; should not exit app - do_eof = do_EOF - - def do_quit(self, arg): - return self._STOP_AND_EXIT - do_exit = do_quit - do_q = do_quit - + def select(self, options, prompt='Your choice? '): - '''Presents a numbered menu to the user. Modelled after - the bash shell's SELECT. Returns the item chosen. - - Argument ``options`` can be: - - | a single string -> will be split into one-word options - | a list of strings -> will be offered as options - | a list of tuples -> interpreted as (value, text), so - that the return value can differ from - the text advertised to the user ''' + ''' + Presents a numbered menu to the user. Modeled after + the bash shell's SELECT. Returns the item chosen. + + Argument ``options`` can be: + + | a single string -> splits into one-word options + | a list of strings -> will be offered as options + | a list of tuples -> interpreted as (value, text), so + that the return value can differ from + the text advertised to the user + ''' if isinstance(options, basestring): options = zip(options.split(), options.split()) + fulloptions = [] + for opt in options: if isinstance(opt, basestring): fulloptions.append((opt, opt)) @@ -968,8 +736,10 @@ class Cmd(cmd.Cmd): fulloptions.append((opt[0], opt[1])) except IndexError: fulloptions.append((opt[0], opt[0])) + for (idx, (value, text)) in enumerate(fulloptions): self.poutput(' %2d. %s\n' % (idx+1, text)) + while True: response = raw_input(prompt) try: @@ -978,12 +748,145 @@ class Cmd(cmd.Cmd): break except ValueError: pass # loop and ask again + return result - - @options([make_option('-l', '--long', action="store_true", - help="describe function of parameter")]) + + + def last_matching(self, arg): + try: + if arg: + return self.history.get(arg)[-1] + else: + return self.history[-1] + except IndexError: + return None + + + def read_file_or_url(self, fname): + # @TODO : not working on localhost + if isinstance(fname, file): + result = open(fname, 'r') + else: + match = self.urlre.match(fname) + if match: + result = urllib.urlopen(match.group(1)) + else: + fname = os.path.expanduser(fname) + try: + result = open(os.path.expanduser(fname), 'r') + except IOError: + result = open('%s.%s' % (os.path.expanduser(fname), + self.defaultExtension), 'r') + return result + + + def fileimport(self, statement, source): + # @FIXME + # Add docstring + try: + f = open(os.path.expanduser(source)) + except IOError: + self.stdout.write("Couldn't read from file %s\n" % source) + return '' + data = f.read() + f.close() + return data + + + def runTranscriptTests(self, callargs): + # @FIXME + # Add docstring + class TestMyAppCase(Cmd2TestCase): + CmdApp = self.__class__ + self.__class__.testfiles = callargs + sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main() + testcase = TestMyAppCase() + runner = unittest.TextTestRunner() + result = runner.run(testcase) + result.printErrors() + + + def run_commands_at_invocation(self, callargs): + # @FIXME + # Add docstring + for initial_command in callargs: + if self.onecmd_plus_hooks(initial_command + '\n'): + return self._STOP_AND_EXIT + + + def do__relative_load(self, arg=None): + ''' + Runs commands in script at file or URL; if this is called from within an + already-running script, the filename will be interpreted relative to the + already-running script's directory. + ''' + if arg: + arg = arg.split(None, 1) + targetname, args = arg[0], (arg[1:] or [''])[0] + targetname = os.path.join(self.current_script_dir or '', targetname) + self.do__load('%s %s' % (targetname, args)) + + urlre = re.compile('(https?://[-\\w\\./]+)') + + def do_cmdenvironment(self, args): + ''' + Summary report of interactive parameters. + ''' + + self.stdout.write(""" + Commands are %(casesensitive)scase-sensitive. + Commands may be terminated with: %(terminators)s + + Settable parameters: %(settable)s\n""" % \ + { 'casesensitive' : (self.case_insensitive and 'not ') or '', + 'terminators' : str(self.terminators), + 'settable' : ' '.join(self.settable) + }) + + + def do_help(self, arg): + if arg: + funcname = self.func_named(arg) + if funcname: + fn = getattr(self, funcname) + try: + fn.optionParser.print_help(file=self.stdout) + except AttributeError: + cmd.Cmd.do_help(self, funcname[3:]) + else: + cmd.Cmd.do_help(self, arg) + + + def do_shortcuts(self, args): + ''' + Lists single-key shortcuts available. + ''' + result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts)) + self.stdout.write("Single-key shortcuts for other commands:\n%s\n" % (result)) + + + def do_EOF(self, arg): + return self._STOP_SCRIPT_NO_EXIT # End of script; should not exit app + + do_eof = do_EOF + + + def do_quit(self, arg): + return self._STOP_AND_EXIT + + do_exit = do_quit + do_q = do_quit + + + @options( + [make_option( + '-l', '--long', + action ="store_true", + help ="describe function of parameter")]) def do_show(self, arg, opts): - '''Shows value of a parameter.''' + ''' + Show the value of a parameter. + ''' param = arg.strip().lower() result = {} maxlen = 0 @@ -999,16 +902,19 @@ class Cmd(cmd.Cmd): self.poutput(result[p]) else: raise NotImplementedError("Parameter '%s' not supported (type 'show' for list of parameters)." % param) - + + def do_set(self, arg): ''' - Sets a cmd2 parameter. Accepts abbreviated parameter names so long - as there is no ambiguity. Call without arguments for a list of - settable parameters with their values.''' + Set a cmd2 parameter. Accepts abbreviated parameter names + if there is no ambiguity. Lists settable parameters and values if + called with no arguments. + ''' try: statement, paramName, val = arg.parsed.raw.split(None, 2) val = val.strip() paramName = paramName.strip().lower() + if paramName not in self.settable: hits = [p for p in self.settable if p.startswith(paramName)] if len(hits) == 1: @@ -1016,12 +922,15 @@ class Cmd(cmd.Cmd): else: return self.do_show(paramName) currentVal = getattr(self, paramName) + if (val[0] == val[-1]) and val[0] in ("'", '"'): val = val[1:-1] - else: + else: val = cast(currentVal, val) + setattr(self, paramName, val) self.stdout.write('%s - was: %s\nnow: %s\n' % (paramName, currentVal, val)) + if currentVal != val: try: onchange_hook = getattr(self, '_onchange_%s' % paramName) @@ -1030,20 +939,27 @@ class Cmd(cmd.Cmd): pass except (ValueError, AttributeError, NotSettableError), e: self.do_show(arg) - + + def do_pause(self, arg): - 'Displays the specified text then waits for the user to press RETURN.' + '''Display the provided text, then wait for the user to press RETURN.''' raw_input(arg + '\n') - + + def do_shell(self, arg): - 'execute a command as if at the OS prompt.' + ''' + Executes a command as if at the OS prompt. + ''' os.system(arg) - - def do_py(self, arg): + + + def do_py(self, arg): ''' py : Executes a Python command. py: Enters interactive Python mode. + End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. + Non-python commands can be issued with ``cmd("your command")``. Run python code from external files with ``run("filename.py")`` ''' @@ -1052,6 +968,7 @@ class Cmd(cmd.Cmd): localvars = (self.locals_in_py and self.pystate) or {} interp = InteractiveConsole(locals=localvars) interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())') + if arg.strip(): interp.runcode(arg) else: @@ -1070,27 +987,34 @@ class Cmd(cmd.Cmd): self.pystate['exit'] = quit self.pystate['cmd'] = onecmd_plus_hooks self.pystate['run'] = run + try: - cprt = 'Type "help", "copyright", "credits" or "license" for more information.' - keepstate = Statekeeper(sys, ('stdin','stdout')) - sys.stdout = self.stdout - sys.stdin = self.stdin + cprt = 'Type "help", "copyright", "credits" or "license" for more information.' + keepstate = Statekeeper(sys, ('stdin','stdout')) + sys.stdout = self.stdout + sys.stdin = self.stdin interp.interact(banner= "Python %s on %s\n%s\n(%s)\n%s" % (sys.version, sys.platform, cprt, self.__class__.__name__, self.do_py.__doc__)) except EmbeddedConsoleExit: pass keepstate.restore() - - @options([make_option('-s', '--script', action="store_true", help="Script format; no separation lines"), - ], arg_desc = '(limit on which commands to include)') + + + @options( + [make_option( + '-s', '--script', + action = 'store_true', + help = 'Script format; no separation lines'),], + arg_desc = '(limit on which commands to include)') def do_history(self, arg, opts): - """history [arg]: lists past commands issued - - | no arg: list all - | arg is integer: list one history item, by index - | arg is string: string search - | arg is /enclosed in forward-slashes/: regular expression search - """ + ''' + history [arg]: lists past commands issued + + | no arg: list all + | arg is integer: list one history item, by index + | arg is string: string search + | /arg enclosed slashes/: regular expression search + ''' if arg: history = self.history.get(arg) else: @@ -1100,23 +1024,17 @@ class Cmd(cmd.Cmd): self.poutput(hi) else: self.stdout.write(hi.pr()) - def last_matching(self, arg): - try: - if arg: - return self.history.get(arg)[-1] - else: - return self.history[-1] - except IndexError: - return None + + def do_list(self, arg): - """list [arg]: lists last command issued - - no arg -> list most recent command - arg is integer -> list one history item, by index - a..b, a:b, a:, ..b -> list spans from a (or start) to b (or end) - arg is string -> list all commands matching string search - arg is /enclosed in forward-slashes/ -> regular expression search - """ + '''list [arg]: lists last command issued + + no arg -> list most recent command + arg is integer -> list one history item, by index + a..b, a:b, a:, ..b -> list spans from a (or start) to b (or end) + arg is string -> list all commands matching string search + /arg enclosed in slashes/ -> regular expression search + ''' try: history = self.history.span(arg or '-1') except IndexError: @@ -1124,18 +1042,22 @@ class Cmd(cmd.Cmd): for hi in history: self.poutput(hi.pr()) - do_hi = do_history - do_l = do_list - do_li = do_list - + do_hi = do_history + do_l = do_list + do_li = do_list + + def do_ed(self, arg): - """ed: edit most recent command in text editor - ed [N]: edit numbered command from history - ed [filename]: edit specified file name - - commands are run after editor is closed. - "set edit (program-name)" or set EDITOR environment variable - to control which editing program is used.""" + ''' + ed: edit most recent command in text editor + ed [N]: edit numbered command from history + ed [filename]: edit specified file name + + Commands are run after editor is closed. + + The editor defaults to the EDITOR environment variable. You + can also use `set edit ` to choose an editor. + ''' if not self.editor: raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.") filename = self.default_file_name @@ -1143,43 +1065,49 @@ class Cmd(cmd.Cmd): try: buffer = self.last_matching(int(arg)) except ValueError: - filename = arg - buffer = '' + filename = arg + buffer = '' else: buffer = self.history[-1] if buffer: f = open(os.path.expanduser(filename), 'w') f.write(buffer or '') - f.close() - + f.close() os.system('%s %s' % (self.editor, filename)) self.do__load(filename) + do_edit = do_ed - - saveparser = (pyparsing.Optional(pyparsing.Word(pyparsing.nums)^'*')("idx") + + + saveparser = (pyparsing.Optional(pyparsing.Word(pyparsing.nums)^'*')("idx") + pyparsing.Optional(pyparsing.Word(legalChars + '/\\'))("fname") + - pyparsing.stringEnd) + pyparsing.stringEnd) + + def do_save(self, arg): - """`save [N] [filename.ext]` + ''' + `save [N] [filename.ext]` Saves command from history to file. - | N => Number of command (from history), or `*`; - | most recent command if omitted""" - + | N => Number of command (from history), or `*`; + | most recent command if omitted + ''' try: args = self.saveparser.parseString(arg) except pyparsing.ParseException: self.perror('Could not understand save target %s' % arg) raise SyntaxError(self.do_save.__doc__) + fname = args.fname or self.default_file_name + if args.idx == '*': saveme = '\n\n'.join(self.history[:]) elif args.idx: saveme = self.history[int(args.idx)-1] else: saveme = self.history[-1] + try: f = open(os.path.expanduser(fname), 'w') f.write(saveme) @@ -1188,38 +1116,12 @@ class Cmd(cmd.Cmd): except Exception, e: self.perror('Error saving %s' % (fname)) raise - - def read_file_or_url(self, fname): - # TODO: not working on localhost - if isinstance(fname, file): - result = open(fname, 'r') - else: - match = self.urlre.match(fname) - if match: - result = urllib.urlopen(match.group(1)) - else: - fname = os.path.expanduser(fname) - try: - result = open(os.path.expanduser(fname), 'r') - except IOError: - result = open('%s.%s' % (os.path.expanduser(fname), - self.defaultExtension), 'r') - return result - - def do__relative_load(self, arg=None): + + + def do_load(self, arg=None): + ''' + Runs script of command(s) from a file or URL. ''' - Runs commands in script at file or URL; if this is called from within an - already-running script, the filename will be interpreted relative to the - already-running script's directory.''' - if arg: - arg = arg.split(None, 1) - targetname, args = arg[0], (arg[1:] or [''])[0] - targetname = os.path.join(self.current_script_dir or '', targetname) - self.do__load('%s %s' % (targetname, args)) - - urlre = re.compile('(https?://[-\\w\\./]+)') - def do_load(self, arg=None): - """Runs script of command(s) from a file or URL.""" if arg is None: targetname = self.default_file_name else: @@ -1232,7 +1134,7 @@ class Cmd(cmd.Cmd): return keepstate = Statekeeper(self, ('stdin','use_rawinput','prompt', 'continuation_prompt','current_script_dir')) - self.stdin = target + self.stdin = target self.use_rawinput = False self.prompt = self.continuation_prompt = '' self.current_script_dir = os.path.split(targetname)[0] @@ -1240,357 +1142,66 @@ class Cmd(cmd.Cmd): self.stdin.close() keepstate.restore() self.lastcmd = '' - return stop and (stop != self._STOP_SCRIPT_NO_EXIT) + return stop and (stop != self._STOP_SCRIPT_NO_EXIT) + do__load = do_load # avoid an unfortunate legacy use of do_load from sqlpython - + def do_run(self, arg): - """run [arg]: re-runs an earlier command - - no arg -> run most recent command - arg is integer -> run one history item, by index - arg is string -> run most recent command by string search - arg is /enclosed in forward-slashes/ -> run most recent by regex - """ + ''' + run [arg]: re-runs an earlier command + + no arg -> run most recent command + arg is integer -> run one history item, by index + arg is string -> run most recent command by string search + /arg enclosed in slashes/ -> run most recent by regex + ''' 'run [N]: runs the SQL that was run N commands ago' runme = self.last_matching(arg) self.pfeedback(runme) if runme: stop = self.onecmd_plus_hooks(runme) - do_r = do_run - - def fileimport(self, statement, source): - try: - f = open(os.path.expanduser(source)) - except IOError: - self.stdout.write("Couldn't read from file %s\n" % source) - return '' - data = f.read() - f.close() - return data - def runTranscriptTests(self, callargs): - class TestMyAppCase(Cmd2TestCase): - CmdApp = self.__class__ - self.__class__.testfiles = callargs - sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main() - testcase = TestMyAppCase() - runner = unittest.TextTestRunner() - result = runner.run(testcase) - result.printErrors() + do_r = do_run - def run_commands_at_invocation(self, callargs): - for initial_command in callargs: - if self.onecmd_plus_hooks(initial_command + '\n'): - return self._STOP_AND_EXIT def cmdloop(self): parser = optparse.OptionParser() - parser.add_option('-t', '--test', dest='test', - action="store_true", - help='Test against transcript(s) in FILE (wildcards OK)') + parser.add_option('-t', '--test', + dest ='test', + action ='store_true', + help ='Test against transcript(s) in FILE (wildcards OK)') (callopts, callargs) = parser.parse_args() if callopts.test: self.runTranscriptTests(callargs) else: if not self.run_commands_at_invocation(callargs): - self._cmdloop() - -class HistoryItem(str): - listformat = '-------------------------[%d]\n%s\n' - def __init__(self, instr): - str.__init__(self) - self.lowercase = self.lower() - self.idx = None - def pr(self): - return self.listformat % (self.idx, str(self)) - -class History(list): - '''A list of HistoryItems that knows how to respond to user requests. - >>> h = History([HistoryItem('first'), HistoryItem('second'), HistoryItem('third'), HistoryItem('fourth')]) - >>> h.span('-2..') - ['third', 'fourth'] - >>> h.span('2..3') - ['second', 'third'] - >>> h.span('3') - ['third'] - >>> h.span(':') - ['first', 'second', 'third', 'fourth'] - >>> h.span('2..') - ['second', 'third', 'fourth'] - >>> h.span('-1') - ['fourth'] - >>> h.span('-2..-3') - ['third', 'second'] - >>> h.search('o') - ['second', 'fourth'] - >>> h.search('/IR/') - ['first', 'third'] - ''' - def zero_based_index(self, onebased): - result = onebased - if result > 0: - result -= 1 - return result - def to_index(self, raw): - if raw: - result = self.zero_based_index(int(raw)) - else: - result = None - return result - def search(self, target): - target = target.strip() - if target[0] == target[-1] == '/' and len(target) > 1: - target = target[1:-1] - else: - target = re.escape(target) - pattern = re.compile(target, re.IGNORECASE) - return [s for s in self if pattern.search(s)] - spanpattern = re.compile(r'^\s*(?P\-?\d+)?\s*(?P:|(\.{2,}))?\s*(?P\-?\d+)?\s*$') - def span(self, raw): - if raw.lower() in ('*', '-', 'all'): - raw = ':' - results = self.spanpattern.search(raw) - if not results: - raise IndexError - if not results.group('separator'): - return [self[self.to_index(results.group('start'))]] - start = self.to_index(results.group('start')) - end = self.to_index(results.group('end')) - reverse = False - if end is not None: - if end < start: - (start, end) = (end, start) - reverse = True - end += 1 - result = self[start:end] - if reverse: - result.reverse() - return result - - rangePattern = re.compile(r'^\s*(?P[\d]+)?\s*\-\s*(?P[\d]+)?\s*$') - def append(self, new): - new = HistoryItem(new) - list.append(self, new) - new.idx = len(self) - def extend(self, new): - for n in new: - self.append(n) - - def get(self, getme=None, fromEnd=False): - if not getme: - return self - try: - getme = int(getme) - if getme < 0: - return self[:(-1 * getme)] - else: - return [self[getme-1]] - except IndexError: - return [] - except ValueError: - rangeResult = self.rangePattern.search(getme) - if rangeResult: - start = rangeResult.group('start') or None - end = rangeResult.group('start') or None - if start: - start = int(start) - 1 - if end: - end = int(end) - return self[start:end] - - getme = getme.strip() - - if getme.startswith(r'/') and getme.endswith(r'/'): - finder = re.compile(getme[1:-1], re.DOTALL | re.MULTILINE | re.IGNORECASE) - def isin(hi): - return finder.search(hi) - else: - def isin(hi): - return (getme.lower() in hi.lowercase) - return [itm for itm in self if isin(itm)] - -class NotSettableError(Exception): - pass - -def cast(current, new): - """Tries to force a new value into the same type as the current.""" - typ = type(current) - if typ == bool: - try: - return bool(int(new)) - except (ValueError, TypeError): - pass - try: - new = new.lower() - except: - pass - if (new=='on') or (new[0] in ('y','t')): - return True - if (new=='off') or (new[0] in ('n','f')): - return False - else: - try: - return typ(new) - except: - pass - print ("Problem setting parameter (now %s) to %s; incorrect type?" % (current, new)) - return current - -class Statekeeper(object): - def __init__(self, obj, attribs): - self.obj = obj - self.attribs = attribs - if self.obj: - self.save() - def save(self): - for attrib in self.attribs: - setattr(self, attrib, getattr(self.obj, attrib)) - def restore(self): - if self.obj: - for attrib in self.attribs: - setattr(self.obj, attrib, getattr(self, attrib)) - -class Borg(object): - '''All instances of any Borg subclass will share state. - from Python Cookbook, 2nd Ed., recipe 6.16''' - _shared_state = {} - def __new__(cls, *a, **k): - obj = object.__new__(cls, *a, **k) - obj.__dict__ = cls._shared_state - return obj - -class OutputTrap(Borg): - '''Instantiate an OutputTrap to divert/capture ALL stdout output. For use in unit testing. - Call `tearDown()` to return to normal output.''' - def __init__(self): - self.contents = '' - self.old_stdout = sys.stdout - sys.stdout = self - def write(self, txt): - self.contents += txt - def read(self): - result = self.contents - self.contents = '' - return result - def tearDown(self): - sys.stdout = self.old_stdout - self.contents = '' - -class Cmd2TestCase(unittest.TestCase): - '''Subclass this, setting CmdApp, to make a unittest.TestCase class - that will execute the commands in a transcript file and expect the results shown. - See example.py''' - CmdApp = None - def fetchTranscripts(self): - self.transcripts = {} - for fileset in self.CmdApp.testfiles: - for fname in glob.glob(fileset): - tfile = open(fname) - self.transcripts[fname] = iter(tfile.readlines()) - tfile.close() - if not len(self.transcripts): - raise (StandardError,), "No test files found - nothing to test." - def setUp(self): - if self.CmdApp: - self.outputTrap = OutputTrap() - self.cmdapp = self.CmdApp() - self.fetchTranscripts() - def runTest(self): # was testall - if self.CmdApp: - its = sorted(self.transcripts.items()) - for (fname, transcript) in its: - self._test_transcript(fname, transcript) - regexPattern = pyparsing.QuotedString(quoteChar=r'/', escChar='\\', multiline=True, unquoteResults=True) - regexPattern.ignore(pyparsing.cStyleComment) - notRegexPattern = pyparsing.Word(pyparsing.printables) - notRegexPattern.setParseAction(lambda t: re.escape(t[0])) - expectationParser = regexPattern | notRegexPattern - anyWhitespace = re.compile(r'\s', re.DOTALL | re.MULTILINE) - def _test_transcript(self, fname, transcript): - lineNum = 0 - finished = False - line = transcript.next() - lineNum += 1 - tests_run = 0 - while not finished: - # Scroll forward to where actual commands begin - while not line.startswith(self.cmdapp.prompt): - try: - line = transcript.next() - except StopIteration: - finished = True - break - lineNum += 1 - command = [line[len(self.cmdapp.prompt):]] - line = transcript.next() - # Read the entirety of a multi-line command - while line.startswith(self.cmdapp.continuation_prompt): - command.append(line[len(self.cmdapp.continuation_prompt):]) - try: - line = transcript.next() - except StopIteration: - raise (StopIteration, - 'Transcript broke off while reading command beginning at line %d with\n%s' - % (command[0])) - lineNum += 1 - command = ''.join(command) - # Send the command into the application and capture the resulting output - stop = self.cmdapp.onecmd_plus_hooks(command) - #TODO: should act on ``stop`` - result = self.outputTrap.read() - # Read the expected result from transcript - if line.startswith(self.cmdapp.prompt): - message = '\nFile %s, line %d\nCommand was:\n%s\nExpected: (nothing)\nGot:\n%s\n'%\ - (fname, lineNum, command, result) - self.assert_(not(result.strip()), message) - continue - expected = [] - while not line.startswith(self.cmdapp.prompt): - expected.append(line) - try: - line = transcript.next() - except StopIteration: - finished = True - break - lineNum += 1 - expected = ''.join(expected) - # Compare actual result to expected - message = '\nFile %s, line %d\nCommand was:\n%s\nExpected:\n%s\nGot:\n%s\n'%\ - (fname, lineNum, command, expected, result) - expected = self.expectationParser.transformString(expected) - # checking whitespace is a pain - let's skip it - expected = self.anyWhitespace.sub('', expected) - result = self.anyWhitespace.sub('', result) - self.assert_(re.match(expected, result, re.MULTILINE | re.DOTALL), message) - - def tearDown(self): - if self.CmdApp: - self.outputTrap.tearDown() + self._cmdloop() + + if __name__ == '__main__': + # This is only run when this file is called as an executable. doctest.testmod(optionflags = doctest.NORMALIZE_WHITESPACE) - + ''' -To make your application transcript-testable, replace +To make your application transcript-testable, replace :: app = MyApp() app.cmdloop() - + with :: app = MyApp() cmd2.run(app) - + Then run a session of your application and paste the entire screen contents into a file, ``transcript.test``, and invoke the test like:: python myapp.py --test transcript.test Wildcards can be used to test against multiple transcript files. -''' - - +''' \ No newline at end of file diff --git a/errors.py b/errors.py index a59564b..ca727aa 100644 --- a/errors.py +++ b/errors.py @@ -1,3 +1,133 @@ ''' This file should contain define all the Exceptions/Errors in Cmd2. -''' \ No newline at end of file +''' + + +class EmptyStatement(Exception): + # @FIXME + # Add docstring description + pass + + +class NotSettableError(Exception): + # @FIXME + # Add docstring description + pass + + +class PasteBufferError(EnvironmentError): + # @FIXME + # Add docstring description + + if sys.platform[:3] == 'win': + errmsg = """ + Redirecting to or from paste buffer requires pywin32 + to be installed on your operating system. + + Download: + http://sourceforge.net/projects/pywin32/ + """ + + elif sys.platform[:3] == 'dar': + # Use built in pbcopy on Mac OSX + pass + + else: + errmsg = """ + Redirecting to or from paste buffer requires xclip + to be installed on your operating system. + + On Debian/Ubuntu, install with: + 'sudo apt-get install xclip' + """ + + def __init__(self): + Exception.__init__(self, self.errmsg) + + +# +# @TODO +# Why is this code outside the PasteBufferError class? +# +# Ideally, it should be inside the class, and all external +# use of it should be brief. +# +pastebufferr = """ + Redirecting to or from paste buffer requires %s + to be installed on operating system. + %s + """ + +if subprocess.mswindows: + try: + import win32clipboard + def get_paste_buffer(): + win32clipboard.OpenClipboard(0) + try: + result = win32clipboard.GetClipboardData() + except TypeError: + result = '' #non-text + win32clipboard.CloseClipboard() + return result + def write_to_paste_buffer(txt): + win32clipboard.OpenClipboard(0) + win32clipboard.EmptyClipboard() + win32clipboard.SetClipboardText(txt) + win32clipboard.CloseClipboard() + except ImportError: + def get_paste_buffer(*args): + raise OSError, pastebufferr % ('pywin32', 'Download from http://sourceforge.net/projects/pywin32/') + write_to_paste_buffer = get_paste_buffer +elif sys.platform == 'darwin': # Mac OS X + can_clip = False + try: + # test for pbcopy - AFAIK, should always be installed on MacOS + subprocess.check_call('pbcopy -help', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + can_clip = True + except (subprocess.CalledProcessError, OSError, IOError): + pass + if can_clip: + def get_paste_buffer(): + pbcopyproc = subprocess.Popen('pbcopy -help', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + return pbcopyproc.stdout.read() + def write_to_paste_buffer(txt): + pbcopyproc = subprocess.Popen('pbcopy', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + pbcopyproc.communicate(txt.encode()) + else: + def get_paste_buffer(*args): + raise OSError, pastebufferr % ('pbcopy', 'On MacOS X - error should not occur - part of the default installation') + write_to_paste_buffer = get_paste_buffer +else: + can_clip = False + try: + subprocess.check_call('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + can_clip = True + except AttributeError: # check_call not defined, Python < 2.5 + try: + teststring = 'Testing for presence of xclip.' + xclipproc = subprocess.Popen('xclip -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + xclipproc.stdin.write(teststring) + xclipproc.stdin.close() + xclipproc = subprocess.Popen('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + if xclipproc.stdout.read() == teststring: + can_clip = True + except Exception: # hate a bare Exception call, but exception classes vary too much b/t stdlib versions + pass + except Exception: + pass # something went wrong with xclip and we cannot use it + if can_clip: + def get_paste_buffer(): + xclipproc = subprocess.Popen('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + return xclipproc.stdout.read() + def write_to_paste_buffer(txt): + xclipproc = subprocess.Popen('xclip -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + xclipproc.stdin.write(txt.encode()) + xclipproc.stdin.close() + # but we want it in both the "primary" and "mouse" clipboards + xclipproc = subprocess.Popen('xclip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + xclipproc.stdin.write(txt.encode()) + xclipproc.stdin.close() + else: + def get_paste_buffer(*args): + raise OSError, pastebufferr % ('xclip', 'On Debian/Ubuntu, install with "sudo apt-get install xclip"') + write_to_paste_buffer = get_paste_buffer \ No newline at end of file diff --git a/parsers.py b/parsers.py index 64f3259..e276be8 100644 --- a/parsers.py +++ b/parsers.py @@ -1,3 +1,60 @@ ''' This file should contain all parser definitions for Cmd2. -''' \ No newline at end of file +''' + +class OptionParser(optparse.OptionParser): + def exit(self, status=0, msg=None): + self.values._exit = True + if msg: + print (msg) + + def print_help(self, *args, **kwargs): + try: + print (self._func.__doc__) + except AttributeError: + pass + optparse.OptionParser.print_help(self, *args, **kwargs) + + def error(self, msg): + """error(msg : string) + + Print a usage message incorporating 'msg' to stderr and exit. + If you override this in a subclass, it should not return -- it + should either exit or raise an exception. + """ + raise optparse.OptParseError(msg) + + +def remaining_args(oldArgs, newArgList): + '''Preserves the original argument's spacing after + removing options. + + >>> remaining_args('-f bar bar cow', ['bar', 'cow']) + 'bar cow' + ''' + pattern = '\s+'.join(re.escape(a) for a in newArgList) + '\s*$' + matchObj = re.search(pattern, oldArgs) + + return oldArgs[matchObj.start():] + + +class ParsedString(str): + def full_parsed_statement(self): + new = ParsedString('%s %s' % (self.parsed.command, self.parsed.args)) + new.parsed = self.parsed + new.parser = self.parser + return new + def with_args_replaced(self, newargs): + new = ParsedString(newargs) + new.parsed = self.parsed + new.parser = self.parser + new.parsed['args'] = newargs + new.parsed.statement['args'] = newargs + return new + + +optparse.Values.get = _attr_get_ + +options_defined = [] # used to distinguish --options from SQL-style --comments + +pyparsing.ParserElement.setDefaultWhitespaceChars(' \t') \ No newline at end of file diff --git a/support.py b/support.py index 63883b9..71043f0 100644 --- a/support.py +++ b/support.py @@ -1,6 +1,356 @@ ''' -This file should contain all "helper" code for Cmd2. +This file should contain all "helper" code for Cmd2. -This includes things such as hook methods, ParsedString, Stubborndict, Borg, Statekeeper, +This includes things such as hook methods, ParsedString, Stubborndict, Borg, Statekeeper, History, and so on. -''' \ No newline at end of file +''' + +import decorator + + +class StubbornDict(dict): + '''Dictionary that tolerates many input formats. + Create it with stubbornDict(arg) factory function. + + >>> d = StubbornDict(large='gross', small='klein') + >>> sorted(d.items()) + [('large', 'gross'), ('small', 'klein')] + >>> d.append(['plain', ' plaid']) + >>> sorted(d.items()) + [('large', 'gross'), ('plaid', ''), ('plain', ''), ('small', 'klein')] + >>> d += ' girl Frauelein, Maedchen\\n\\n shoe schuh' + >>> sorted(d.items()) + [('girl', 'Frauelein, Maedchen'), ('large', 'gross'), ('plaid', ''), ('plain', ''), ('shoe', 'schuh'), ('small', 'klein')] + ''' + def update(self, arg): + dict.update(self, StubbornDict.to_dict(arg)) + append = update + def __iadd__(self, arg): + self.update(arg) + return self + def __add__(self, arg): + selfcopy = copy.copy(self) + selfcopy.update(stubbornDict(arg)) + return selfcopy + def __radd__(self, arg): + selfcopy = copy.copy(self) + selfcopy.update(stubbornDict(arg)) + return selfcopy + + @classmethod + def to_dict(cls, arg): + 'Generates dictionary from string or list of strings' + if hasattr(arg, 'splitlines'): + arg = arg.splitlines() + if hasattr(arg, '__reversed__'): + result = {} + for a in arg: + a = a.strip() + if a: + key_val = a.split(None, 1) + key = key_val[0] + if len(key_val) > 1: + val = key_val[1] + else: + val = '' + result[key] = val + else: + result = arg + return result + + +# @TODO +# Why is this defined outside of the StubbornDict class? +def stubbornDict(*arg, **kwarg): + # @FIXME + # Add docstring describing this method's + # purpose and relation to the StubbornDict class + ''' + >>> sorted(stubbornDict('cow a bovine\\nhorse an equine').items()) + [('cow', 'a bovine'), ('horse', 'an equine')] + >>> sorted(stubbornDict(['badger', 'porcupine a poky creature']).items()) + [('badger', ''), ('porcupine', 'a poky creature')] + >>> sorted(stubbornDict(turtle='has shell', frog='jumpy').items()) + [('frog', 'jumpy'), ('turtle', 'has shell')] + ''' + result = {} + + for a in arg: + result.update(StubbornDict.to_dict(a)) + + result.update(kwarg) + + return StubbornDict(result) + + +class HistoryItem(str): + listformat = '-------------------------[%d]\n%s\n' + def __init__(self, instr): + str.__init__(self) + self.lowercase = self.lower() + self.idx = None + def pr(self): + return self.listformat % (self.idx, str(self)) + + +class History(list): + '''A list of HistoryItems that knows how to respond to user requests. + >>> h = History([HistoryItem('first'), HistoryItem('second'), HistoryItem('third'), HistoryItem('fourth')]) + >>> h.span('-2..') + ['third', 'fourth'] + >>> h.span('2..3') + ['second', 'third'] + >>> h.span('3') + ['third'] + >>> h.span(':') + ['first', 'second', 'third', 'fourth'] + >>> h.span('2..') + ['second', 'third', 'fourth'] + >>> h.span('-1') + ['fourth'] + >>> h.span('-2..-3') + ['third', 'second'] + >>> h.search('o') + ['second', 'fourth'] + >>> h.search('/IR/') + ['first', 'third'] + ''' + def zero_based_index(self, onebased): + result = onebased + if result > 0: + result -= 1 + return result + def to_index(self, raw): + if raw: + result = self.zero_based_index(int(raw)) + else: + result = None + return result + def search(self, target): + target = target.strip() + if target[0] == target[-1] == '/' and len(target) > 1: + target = target[1:-1] + else: + target = re.escape(target) + pattern = re.compile(target, re.IGNORECASE) + return [s for s in self if pattern.search(s)] + spanpattern = re.compile(r'^\s*(?P\-?\d+)?\s*(?P:|(\.{2,}))?\s*(?P\-?\d+)?\s*$') + def span(self, raw): + if raw.lower() in ('*', '-', 'all'): + raw = ':' + results = self.spanpattern.search(raw) + if not results: + raise IndexError + if not results.group('separator'): + return [self[self.to_index(results.group('start'))]] + start = self.to_index(results.group('start')) + end = self.to_index(results.group('end')) + reverse = False + if end is not None: + if end < start: + (start, end) = (end, start) + reverse = True + end += 1 + result = self[start:end] + if reverse: + result.reverse() + return result + + rangePattern = re.compile(r'^\s*(?P[\d]+)?\s*\-\s*(?P[\d]+)?\s*$') + def append(self, new): + new = HistoryItem(new) + list.append(self, new) + new.idx = len(self) + def extend(self, new): + for n in new: + self.append(n) + + def get(self, getme=None, fromEnd=False): + if not getme: + return self + try: + getme = int(getme) + if getme < 0: + return self[:(-1 * getme)] + else: + return [self[getme-1]] + except IndexError: + return [] + except ValueError: + rangeResult = self.rangePattern.search(getme) + if rangeResult: + start = rangeResult.group('start') or None + end = rangeResult.group('start') or None + if start: + start = int(start) - 1 + if end: + end = int(end) + return self[start:end] + + getme = getme.strip() + + if getme.startswith(r'/') and getme.endswith(r'/'): + finder = re.compile(getme[1:-1], re.DOTALL | re.MULTILINE | re.IGNORECASE) + def isin(hi): + return finder.search(hi) + else: + def isin(hi): + return (getme.lower() in hi.lowercase) + return [itm for itm in self if isin(itm)] + + +class Statekeeper(object): + def __init__(self, obj, attribs): + self.obj = obj + self.attribs = attribs + if self.obj: + self.save() + def save(self): + for attrib in self.attribs: + setattr(self, attrib, getattr(self.obj, attrib)) + def restore(self): + if self.obj: + for attrib in self.attribs: + setattr(self.obj, attrib, getattr(self, attrib)) + + +class Borg(object): + '''All instances of any Borg subclass will share state. + from Python Cookbook, 2nd Ed., recipe 6.16''' + _shared_state = {} + def __new__(cls, *a, **k): + obj = object.__new__(cls, *a, **k) + obj.__dict__ = cls._shared_state + return obj + + +class OutputTrap(Borg): + '''Instantiate an OutputTrap to divert/capture ALL stdout output. For use in unit testing. + Call `tearDown()` to return to normal output.''' + def __init__(self): + self.contents = '' + self.old_stdout = sys.stdout + sys.stdout = self + def write(self, txt): + self.contents += txt + def read(self): + result = self.contents + self.contents = '' + return result + def tearDown(self): + sys.stdout = self.old_stdout + self.contents = '' + + +class EmbeddedConsoleExit(SystemExit): + pass + + +@decorator +def options(option_list, arg_desc="arg"): + '''Used as a decorator and passed a list of optparse-style options, + alters a cmd2 method to populate its ``opts`` argument from its + raw text argument. + + Example: transform + def do_something(self, arg): + + into + @options([make_option('-q', '--quick', action="store_true", + help="Makes things fast")], + "source dest") + def do_something(self, arg, opts): + if opts.quick: + self.fast_button = True + ''' + if not isinstance(option_list, list): + option_list = [option_list] + for opt in option_list: + options_defined.append(pyparsing.Literal(opt.get_opt_string())) + def option_setup(func): + optionParser = OptionParser() + for opt in option_list: + optionParser.add_option(opt) + optionParser.set_usage("%s [options] %s" % (func.__name__[3:], arg_desc)) + optionParser._func = func + def new_func(instance, arg): + try: + opts, newArgList = optionParser.parse_args(arg.split()) + # Must find the remaining args in the original argument list, but + # mustn't include the command itself + #if hasattr(arg, 'parsed') and newArgList[0] == arg.parsed.command: + # newArgList = newArgList[1:] + newArgs = remaining_args(arg, newArgList) + if isinstance(arg, ParsedString): + arg = arg.with_args_replaced(newArgs) + else: + arg = newArgs + except optparse.OptParseError, e: + print (e) + optionParser.print_help() + return + if hasattr(opts, '_exit'): + return None + result = func(instance, arg, opts) + return result + new_func.__doc__ = '%s\n%s' % (func.__doc__, optionParser.format_help()) + return new_func + return option_setup + + +def cast(current, new): + """Tries to force a new value into the same type as the current.""" + typ = type(current) + if typ == bool: + try: + return bool(int(new)) + except (ValueError, TypeError): + pass + try: + new = new.lower() + except: + pass + if (new=='on') or (new[0] in ('y','t')): + return True + if (new=='off') or (new[0] in ('n','f')): + return False + else: + try: + return typ(new) + except: + pass + print ("Problem setting parameter (now %s) to %s; incorrect type?" % (current, new)) + return current + + +def _attr_get_(obj, attr): + '''Returns an attribute's value (or None if undefined--no error). + + Analagous to .get() for dictionaries. Useful when checking for + option values that may not have be defined on a given method. + ''' + try: + return getattr(obj, attr) + except AttributeError: + return None + + +def replace_with_file_contents(fname): + if fname: + try: + result = open(os.path.expanduser(fname[0])).read() + except IOError: + result = '< %s' % fname[0] # wasn't a file after all + else: + result = get_paste_buffer() + return result + + +def ljust(x, width, fillchar=' '): + '''Like str.ljust(), but for lists.''' + if hasattr(x, 'ljust'): + return x.ljust(width, fillchar) + else: + if len(x) < width: + x = (x + [fillchar] * width)[:width] + return x \ No newline at end of file diff --git a/tests.py b/tests.py index 0c95cb5..3594c73 100644 --- a/tests.py +++ b/tests.py @@ -1,3 +1,116 @@ ''' This file should contain all tests for Cmd2. -''' \ No newline at end of file +''' + + +class Cmd2TestCase(unittest.TestCase): + ''' + Subclass this (setting CmdApp) to make a "unittest.TestCase" class + that executes commands from a transcript file and expects the results shown. + + See example.py. + ''' + CmdApp = None + + def fetchTranscripts(self): + self.transcripts = {} + for fileset in self.CmdApp.testfiles: + for fname in glob.glob(fileset): + tfile = open(fname) + self.transcripts[fname] = iter(tfile.readlines()) + tfile.close() + if not len(self.transcripts): + raise (StandardError,), "No test files found...nothing to test." + + def setUp(self): + if self.CmdApp: + self.outputTrap = OutputTrap() + self.cmdapp = self.CmdApp() + self.fetchTranscripts() + + def runTest(self): # was testall + if self.CmdApp: + its = sorted(self.transcripts.items()) + for (fname, transcript) in its: + self._test_transcript(fname, transcript) + + regexPattern = pyparsing.QuotedString(quoteChar=r'/', escChar='\\', multiline=True, unquoteResults=True) + regexPattern.ignore(pyparsing.cStyleComment) + + notRegexPattern = pyparsing.Word(pyparsing.printables) + notRegexPattern.setParseAction(lambda t: re.escape(t[0])) + + expectationParser = regexPattern | notRegexPattern + anyWhitespace = re.compile(r'\s', re.DOTALL | re.MULTILINE) + + def _test_transcript(self, fname, transcript): + lineNum = 0 + finished = False + line = transcript.next() + lineNum += 1 + tests_run = 0 + + while not finished: + # Scroll forward to where actual commands begin + while not line.startswith(self.cmdapp.prompt): + try: + line = transcript.next() + except StopIteration: + finished = True + break + lineNum += 1 + command = [line[len(self.cmdapp.prompt):]] + line = transcript.next() + + # Read the entirety of a multi-line command + while line.startswith(self.cmdapp.continuation_prompt): + command.append(line[len(self.cmdapp.continuation_prompt):]) + try: + line = transcript.next() + except StopIteration: + raise (StopIteration, + 'Transcript broke off while reading command beginning at line %d with\n%s' + % (command[0])) + lineNum += 1 + + command = ''.join(command) + + # Send the command into the application and capture the resulting output + stop = self.cmdapp.onecmd_plus_hooks(command) + + #TODO: should act on ``stop`` + result = self.outputTrap.read() + + # Read the expected result from transcript + if line.startswith(self.cmdapp.prompt): + message = '\nFile %s, line %d\nCommand was:\n%s\nExpected: (nothing)\nGot:\n%s\n'%\ + (fname, lineNum, command, result) + self.assert_(not(result.strip()), message) + continue + + expected = [] + + while not line.startswith(self.cmdapp.prompt): + expected.append(line) + try: + line = transcript.next() + except StopIteration: + finished = True + break + lineNum += 1 + + expected = ''.join(expected) + + # Compare actual result to expected + message = '\nFile %s, line %d\nCommand was:\n%s\nExpected:\n%s\nGot:\n%s\n'%\ + (fname, lineNum, command, expected, result) + expected = self.expectationParser.transformString(expected) + + # checking whitespace is a pain - let's skip it + expected = self.anyWhitespace.sub('', expected) + result = self.anyWhitespace.sub('', result) + self.assert_(re.match(expected, result, re.MULTILINE | re.DOTALL), message) + + def tearDown(self): + if self.CmdApp: + self.outputTrap.tearDown() -- cgit v1.2.1