summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorcliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a>2011-03-18 00:49:16 +0000
committercliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a>2011-03-18 00:49:16 +0000
commita66fb14ae48a70d97926d197f3af157e2f2b1ab1 (patch)
treeedfef65be9c6de219990224b369d2d601267a616 /test
parente468387ae4cbfdc4e79b26b673dca0ac9cc5038c (diff)
downloadpyserial-a66fb14ae48a70d97926d197f3af157e2f2b1ab1.tar.gz
- add a "search path" for (URL) protocol handlers so that user can add its own
- add unit test for custom handler - doc update - increment version to 2.6-pre1 git-svn-id: http://svn.code.sf.net/p/pyserial/code/trunk/pyserial@388 f19166aa-fa4f-0410-85c2-fa1106f25c8a
Diffstat (limited to 'test')
-rw-r--r--test/handlers/__init__.py0
-rw-r--r--test/handlers/protocol_test.py202
-rw-r--r--test/test_url.py54
3 files changed, 256 insertions, 0 deletions
diff --git a/test/handlers/__init__.py b/test/handlers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/handlers/__init__.py
diff --git a/test/handlers/protocol_test.py b/test/handlers/protocol_test.py
new file mode 100644
index 0000000..57cdf58
--- /dev/null
+++ b/test/handlers/protocol_test.py
@@ -0,0 +1,202 @@
+#! python
+#
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# This module implements a URL dummy handler for serial_for_url.
+#
+# (C) 2011 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+#
+# URL format: test://
+
+from serial.serialutil import *
+import time
+import socket
+import logging
+
+# map log level names to constants. used in fromURL()
+LOGGER_LEVELS = {
+ 'debug': logging.DEBUG,
+ 'info': logging.INFO,
+ 'warning': logging.WARNING,
+ 'error': logging.ERROR,
+ }
+
+class DummySerial(SerialBase):
+ """Serial port implementation for plain sockets."""
+
+ def open(self):
+ """Open port with current settings. This may throw a SerialException
+ if the port cannot be opened."""
+ self.logger = None
+ if self._port is None:
+ raise SerialException("Port must be configured before it can be used.")
+ # not that there anything to configure...
+ self._reconfigurePort()
+ # all things set up get, now a clean start
+ self._isOpen = True
+
+ def _reconfigurePort(self):
+ """Set communication parameters on opened port. for the test://
+ protocol all settings are ignored!"""
+ if self.logger:
+ self.logger.info('ignored port configuration change')
+
+ def close(self):
+ """Close port"""
+ if self._isOpen:
+ self._isOpen = False
+
+ def makeDeviceName(self, port):
+ raise SerialException("there is no sensible way to turn numbers into URLs")
+
+ def fromURL(self, url):
+ """extract host and port from an URL string"""
+ if url.lower().startswith("test://"): url = url[7:]
+ try:
+ # is there a "path" (our options)?
+ if '/' in url:
+ # cut away options
+ url, options = url.split('/', 1)
+ # process options now, directly altering self
+ for option in options.split('/'):
+ if '=' in option:
+ option, value = option.split('=', 1)
+ else:
+ value = None
+ if option == 'logging':
+ logging.basicConfig() # XXX is that good to call it here?
+ self.logger = logging.getLogger('pySerial.test')
+ self.logger.setLevel(LOGGER_LEVELS[value])
+ self.logger.debug('enabled logging')
+ else:
+ raise ValueError('unknown option: %r' % (option,))
+ except ValueError, e:
+ raise SerialException('expected a string in the form "[test://][option[/option...]]": %s' % e)
+ return (host, port)
+
+ # - - - - - - - - - - - - - - - - - - - - - - - -
+
+ def inWaiting(self):
+ """Return the number of characters currently in the input buffer."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ # set this one to debug as the function could be called often...
+ self.logger.debug('WARNING: inWaiting returns dummy value')
+ return 0 # hmmm, see comment in read()
+
+ def read(self, size=1):
+ """Read size bytes from the serial port. If a timeout is set it may
+ return less characters as requested. With no timeout it will block
+ until the requested number of bytes is read."""
+ if not self._isOpen: raise portNotOpenError
+ data = '123' # dummy data
+ return bytes(data)
+
+ def write(self, data):
+ """Output the given string over the serial port. Can block if the
+ connection is blocked. May raise SerialException if the connection is
+ closed."""
+ if not self._isOpen: raise portNotOpenError
+ # nothing done
+ return len(data)
+
+ def flushInput(self):
+ """Clear input buffer, discarding all that is in the buffer."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored flushInput')
+
+ def flushOutput(self):
+ """Clear output buffer, aborting the current output and
+ discarding all that is in the buffer."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored flushOutput')
+
+ def sendBreak(self, duration=0.25):
+ """Send break condition. Timed, returns to idle state after given
+ duration."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored sendBreak(%r)' % (duration,))
+
+ def setBreak(self, level=True):
+ """Set break: Controls TXD. When active, to transmitting is
+ possible."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored setBreak(%r)' % (level,))
+
+ def setRTS(self, level=True):
+ """Set terminal status line: Request To Send"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored setRTS(%r)' % (level,))
+
+ def setDTR(self, level=True):
+ """Set terminal status line: Data Terminal Ready"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored setDTR(%r)' % (level,))
+
+ def getCTS(self):
+ """Read terminal status line: Clear To Send"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getCTS()')
+ return True
+
+ def getDSR(self):
+ """Read terminal status line: Data Set Ready"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getDSR()')
+ return True
+
+ def getRI(self):
+ """Read terminal status line: Ring Indicator"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getRI()')
+ return False
+
+ def getCD(self):
+ """Read terminal status line: Carrier Detect"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getCD()')
+ return True
+
+ # - - - platform specific - - -
+ # None so far
+
+
+# assemble Serial class with the platform specific implementation and the base
+# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
+# library, derive from io.RawIOBase
+try:
+ import io
+except ImportError:
+ # classic version with our own file-like emulation
+ class Serial(DummySerial, FileLike):
+ pass
+else:
+ # io library present
+ class Serial(DummySerial, io.RawIOBase):
+ pass
+
+
+# simple client test
+if __name__ == '__main__':
+ import sys
+ s = Serial('test://logging=debug')
+ sys.stdout.write('%s\n' % s)
+
+ sys.stdout.write("write...\n")
+ s.write("hello\n")
+ s.flush()
+ sys.stdout.write("read: %s\n" % s.read(5))
+
+ s.close()
diff --git a/test/test_url.py b/test/test_url.py
new file mode 100644
index 0000000..700bdbb
--- /dev/null
+++ b/test/test_url.py
@@ -0,0 +1,54 @@
+#! /usr/bin/env python
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# (C) 2001-2008 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""\
+Some tests for the serial module.
+Part of pySerial (http://pyserial.sf.net) (C)2001-2011 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+Cover some of the aspects of serial_for_url and the extension mechanism.
+"""
+
+import unittest
+import time
+import sys
+import serial
+
+
+class Test_URL(unittest.TestCase):
+ """Test serial_for_url"""
+
+ def test_loop(self):
+ """loop interface"""
+ s = serial.serial_for_url('loop://', do_not_open=True)
+
+ def test_bad_url(self):
+ """invalid protocol specified"""
+ self.failUnlessRaises(ValueError, serial.serial_for_url, "imnotknown://")
+
+ def test_custom_url(self):
+ """custom protocol handlers"""
+ # it's unknown
+ self.failUnlessRaises(ValueError, serial.serial_for_url, "test://")
+ # add search path
+ serial.protocol_handler_packages.append('handlers')
+ # now it should work
+ s = serial.serial_for_url("test://")
+ # remove our handler again
+ serial.protocol_handler_packages.remove('handlers')
+ # so it should not work anymore
+ self.failUnlessRaises(ValueError, serial.serial_for_url, "test://")
+
+
+if __name__ == '__main__':
+ import sys
+ sys.stdout.write(__doc__)
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()