diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/handlers/__init__.py | 0 | ||||
-rw-r--r-- | test/handlers/protocol_test.py | 202 | ||||
-rw-r--r-- | test/run_all_tests.py | 53 | ||||
-rw-r--r-- | test/test.py | 230 | ||||
-rw-r--r-- | test/test_advanced.py | 188 | ||||
-rw-r--r-- | test/test_high_load.py | 77 | ||||
-rw-r--r-- | test/test_iolib.py | 80 | ||||
-rw-r--r-- | test/test_readline.py | 107 | ||||
-rw-r--r-- | test/test_url.py | 54 |
9 files changed, 991 insertions, 0 deletions
diff --git a/test/handlers/__init__.py b/test/handlers/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/handlers/__init__.py diff --git a/test/handlers/protocol_test.py b/test/handlers/protocol_test.py new file mode 100644 index 0000000..42ac4b2 --- /dev/null +++ b/test/handlers/protocol_test.py @@ -0,0 +1,202 @@ +#! python +# +# Python Serial Port Extension for Win32, Linux, BSD, Jython +# see __init__.py +# +# This module implements a URL dummy handler for serial_for_url. +# +# (C) 2011 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt +# +# URL format: test:// + +from serial.serialutil import * +import time +import socket +import logging + +# map log level names to constants. used in fromURL() +LOGGER_LEVELS = { + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + } + +class DummySerial(SerialBase): + """Serial port implementation for plain sockets.""" + + def open(self): + """Open port with current settings. This may throw a SerialException + if the port cannot be opened.""" + self.logger = None + if self._port is None: + raise SerialException("Port must be configured before it can be used.") + # not that there anything to configure... + self._reconfigurePort() + # all things set up get, now a clean start + self._isOpen = True + + def _reconfigurePort(self): + """Set communication parameters on opened port. for the test:// + protocol all settings are ignored!""" + if self.logger: + self.logger.info('ignored port configuration change') + + def close(self): + """Close port""" + if self._isOpen: + self._isOpen = False + + def makeDeviceName(self, port): + raise SerialException("there is no sensible way to turn numbers into URLs") + + def fromURL(self, url): + """extract host and port from an URL string""" + if url.lower().startswith("test://"): url = url[7:] + try: + # is there a "path" (our options)? + if '/' in url: + # cut away options + url, options = url.split('/', 1) + # process options now, directly altering self + for option in options.split('/'): + if '=' in option: + option, value = option.split('=', 1) + else: + value = None + if option == 'logging': + logging.basicConfig() # XXX is that good to call it here? + self.logger = logging.getLogger('pySerial.test') + self.logger.setLevel(LOGGER_LEVELS[value]) + self.logger.debug('enabled logging') + else: + raise ValueError('unknown option: %r' % (option,)) + except ValueError as e: + raise SerialException('expected a string in the form "[test://][option[/option...]]": %s' % e) + return (host, port) + + # - - - - - - - - - - - - - - - - - - - - - - - - + + def inWaiting(self): + """Return the number of characters currently in the input buffer.""" + if not self._isOpen: raise portNotOpenError + if self.logger: + # set this one to debug as the function could be called often... + self.logger.debug('WARNING: inWaiting returns dummy value') + return 0 # hmmm, see comment in read() + + def read(self, size=1): + """Read size bytes from the serial port. If a timeout is set it may + return less characters as requested. With no timeout it will block + until the requested number of bytes is read.""" + if not self._isOpen: raise portNotOpenError + data = '123' # dummy data + return bytes(data) + + def write(self, data): + """Output the given string over the serial port. Can block if the + connection is blocked. May raise SerialException if the connection is + closed.""" + if not self._isOpen: raise portNotOpenError + # nothing done + return len(data) + + def flushInput(self): + """Clear input buffer, discarding all that is in the buffer.""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('ignored flushInput') + + def flushOutput(self): + """Clear output buffer, aborting the current output and + discarding all that is in the buffer.""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('ignored flushOutput') + + def sendBreak(self, duration=0.25): + """Send break condition. Timed, returns to idle state after given + duration.""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('ignored sendBreak(%r)' % (duration,)) + + def setBreak(self, level=True): + """Set break: Controls TXD. When active, to transmitting is + possible.""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('ignored setBreak(%r)' % (level,)) + + def setRTS(self, level=True): + """Set terminal status line: Request To Send""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('ignored setRTS(%r)' % (level,)) + + def setDTR(self, level=True): + """Set terminal status line: Data Terminal Ready""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('ignored setDTR(%r)' % (level,)) + + def getCTS(self): + """Read terminal status line: Clear To Send""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('returning dummy for getCTS()') + return True + + def getDSR(self): + """Read terminal status line: Data Set Ready""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('returning dummy for getDSR()') + return True + + def getRI(self): + """Read terminal status line: Ring Indicator""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('returning dummy for getRI()') + return False + + def getCD(self): + """Read terminal status line: Carrier Detect""" + if not self._isOpen: raise portNotOpenError + if self.logger: + self.logger.info('returning dummy for getCD()') + return True + + # - - - platform specific - - - + # None so far + + +# assemble Serial class with the platform specific implementation and the base +# for file-like behavior. for Python 2.6 and newer, that provide the new I/O +# library, derive from io.RawIOBase +try: + import io +except ImportError: + # classic version with our own file-like emulation + class Serial(DummySerial, FileLike): + pass +else: + # io library present + class Serial(DummySerial, io.RawIOBase): + pass + + +# simple client test +if __name__ == '__main__': + import sys + s = Serial('test://logging=debug') + sys.stdout.write('%s\n' % s) + + sys.stdout.write("write...\n") + s.write("hello\n") + s.flush() + sys.stdout.write("read: %s\n" % s.read(5)) + + s.close() diff --git a/test/run_all_tests.py b/test/run_all_tests.py new file mode 100644 index 0000000..e7f115d --- /dev/null +++ b/test/run_all_tests.py @@ -0,0 +1,53 @@ +#! /usr/bin/env python + +"""\ +UnitTest runner. This one searches for all files named test_*.py and collects +all test cases from these files. Finally it runs all tests and prints a +summary. +""" + +import unittest +import sys +import os +import time + +# inject local copy to avoid testing the installed version instead of the +# working copy (only for 2.x as the sources would need to be translated with +# 2to3 for Python 3, use installed module instead for Python 3). +if sys.version_info < (3, 0): + sys.path.insert(0, '..') + +import serial +print("Patching sys.path to test local version. Testing Version: %s" % (serial.VERSION,)) + +PORT = 'loop://' +if len(sys.argv) > 1: + PORT = sys.argv[1] + +# find files and the tests in them +mainsuite = unittest.TestSuite() +for modulename in [os.path.splitext(x)[0] + for x in os.listdir('.') + if x != __file__ and x.startswith("test") and x.endswith(".py") +]: + try: + module = __import__(modulename) + except ImportError: + print("skipping %s" % (modulename,)) + else: + module.PORT = PORT + testsuite = unittest.findTestCases(module) + print("found %s tests in %r" % (testsuite.countTestCases(), modulename)) + mainsuite.addTest(testsuite) + +verbosity = 1 +if '-v' in sys.argv[1:]: + verbosity = 2 + +# run the collected tests +testRunner = unittest.TextTestRunner(verbosity=verbosity) +#~ testRunner = unittest.ConsoleTestRunner(verbosity=verbosity) +result = testRunner.run(mainsuite) + +# set exit code accordingly to test results +sys.exit(not result.wasSuccessful()) diff --git a/test/test.py b/test/test.py new file mode 100644 index 0000000..25a34c3 --- /dev/null +++ b/test/test.py @@ -0,0 +1,230 @@ +#! /usr/bin/env python +# Python Serial Port Extension for Win32, Linux, BSD, Jython +# see __init__.py +# +# (C) 2001-2008 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt + +"""\ +Some tests for the serial module. +Part of pyserial (http://pyserial.sf.net) (C)2001-2009 cliechti@gmx.net + +Intended to be run on different platforms, to ensure portability of +the code. + +For all these tests a simple hardware is required. +Loopback HW adapter: +Shortcut these pin pairs: + TX <-> RX + RTS <-> CTS + DTR <-> DSR + +On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) +""" + +import unittest +import threading +import time +import sys +import serial + +# on which port should the tests be performed: +PORT = 0 + +if sys.version_info >= (3, 0): + def data(string): + return bytes(string, 'latin1') + bytes_0to255 = bytes(range(256)) +else: + def data(string): return string + bytes_0to255 = ''.join([chr(x) for x in range(256)]) + + +def segments(data, size=16): + for a in range(0, len(data), size): + yield data[a:a+size] + + +class Test4_Nonblocking(unittest.TestCase): + """Test with timeouts""" + timeout = 0 + + def setUp(self): + self.s = serial.serial_for_url(PORT, timeout=self.timeout) + + def tearDown(self): + self.s.close() + + def test0_Messy(self): + """NonBlocking (timeout=0)""" + # this is only here to write out the message in verbose mode + # because Test3 and Test4 print the same messages + + def test1_ReadEmpty(self): + """timeout: After port open, the input buffer must be empty""" + self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer") + + def test2_Loopback(self): + """timeout: each sent character should return (binary test). + this is also a test for the binary capability of a port.""" + for block in segments(bytes_0to255): + length = len(block) + self.s.write(block) + # there might be a small delay until the character is ready (especially on win32) + time.sleep(0.05) + self.failUnlessEqual(self.s.inWaiting(), length, "expected exactly %d character for inWainting()" % length) + self.failUnlessEqual(self.s.read(length), block)#, "expected a %r which was written before" % block) + self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read") + + def test2_LoopbackTimeout(self): + """timeout: test the timeout/immediate return. + partial results should be returned.""" + self.s.write(data("HELLO")) + time.sleep(0.1) # there might be a small delay until the character is ready (especially on win32 and rfc2217) + # read more characters as are available to run in the timeout + self.failUnlessEqual(self.s.read(10), data('HELLO'), "expected the 'HELLO' which was written before") + self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read") + + +class Test3_Timeout(Test4_Nonblocking): + """Same tests as the NonBlocking ones but this time with timeout""" + timeout = 1 + + def test0_Messy(self): + """Blocking (timeout=1)""" + # this is only here to write out the message in verbose mode + # because Test3 and Test4 print the same messages + + +class SendEvent(threading.Thread): + def __init__(self, serial, delay=3): + threading.Thread.__init__(self) + self.serial = serial + self.delay = delay + self.x = threading.Event() + self.stopped = 0 + self.start() + + def run(self): + time.sleep(self.delay) + self.x.set() + if not self.stopped: + self.serial.write(data("E")) + self.serial.flush() + + def isSet(self): + return self.x.isSet() + + def stop(self): + self.stopped = 1 + self.x.wait() + +class Test1_Forever(unittest.TestCase): + """Tests a port with no timeout. These tests require that a + character is sent after some time to stop the test, this is done + through the SendEvent class and the Loopback HW.""" + def setUp(self): + self.s = serial.serial_for_url(PORT, timeout=None) + self.event = SendEvent(self.s) + + def tearDown(self): + self.event.stop() + self.s.close() + + def test2_ReadEmpty(self): + """no timeout: after port open, the input buffer must be empty (read). + a character is sent after some time to terminate the test (SendEvent).""" + c = self.s.read(1) + if not (self.event.isSet() and c == data('E')): + self.fail("expected marker (evt=%r, c=%r)" % (self.event.isSet(), c)) + + +class Test2_Forever(unittest.TestCase): + """Tests a port with no timeout""" + def setUp(self): + self.s = serial.serial_for_url(PORT, timeout=None) + + def tearDown(self): + self.s.close() + + def test1_inWaitingEmpty(self): + """no timeout: after port open, the input buffer must be empty (inWaiting)""" + self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer") + + def test2_Loopback(self): + """no timeout: each sent character should return (binary test). + this is also a test for the binary capability of a port.""" + for block in segments(bytes_0to255): + length = len(block) + self.s.write(block) + # there might be a small delay until the character is ready (especially on win32 and rfc2217) + time.sleep(0.05) + self.failUnlessEqual(self.s.inWaiting(), length)#, "expected exactly %d character for inWainting()" % length) + self.failUnlessEqual(self.s.read(length), block) #, "expected %r which was written before" % block) + self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer after all sent chars are read") + + +class Test0_DataWires(unittest.TestCase): + """Test modem control lines""" + def setUp(self): + self.s = serial.serial_for_url(PORT) + + def tearDown(self): + self.s.close() + + def test1_RTS(self): + """Test RTS/CTS""" + self.s.setRTS(0) + time.sleep(1.1) + self.failUnless(not self.s.getCTS(), "CTS -> 0") + self.s.setRTS(1) + time.sleep(1.1) + self.failUnless(self.s.getCTS(), "CTS -> 1") + + def test2_DTR(self): + """Test DTR/DSR""" + self.s.setDTR(0) + time.sleep(1.1) + self.failUnless(not self.s.getDSR(), "DSR -> 0") + self.s.setDTR(1) + time.sleep(1.1) + self.failUnless(self.s.getDSR(), "DSR -> 1") + + def test3_RI(self): + """Test RI""" + self.failUnless(not self.s.getRI(), "RI -> 0") + + +class Test_MoreTimeouts(unittest.TestCase): + """Test with timeouts""" + def setUp(self): + # create an closed serial port + self.s = serial.serial_for_url(PORT, do_not_open=True) + + def tearDown(self): + self.s.close() + + def test_WriteTimeout(self): + """Test write() timeout.""" + # use xonxoff setting and the loop-back adapter to switch traffic on hold + self.s.port = PORT + self.s.writeTimeout = 1 + self.s.xonxoff = 1 + self.s.open() + self.s.write(serial.XOFF) + time.sleep(0.5) # some systems need a little delay so that they can react on XOFF + t1 = time.time() + self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, data("timeout please"*200)) + t2 = time.time() + self.failUnless( 0.9 <= (t2-t1) < 2.1, "Timeout not in the given interval (%s)" % (t2-t1)) + + +if __name__ == '__main__': + import sys + sys.stdout.write(__doc__) + if len(sys.argv) > 1: + PORT = sys.argv[1] + sys.stdout.write("Testing port: %r\n" % PORT) + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/test/test_advanced.py b/test/test_advanced.py new file mode 100644 index 0000000..0d85e2a --- /dev/null +++ b/test/test_advanced.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python +# needs at least python 2.2.3 + +# Python Serial Port Extension for Win32, Linux, BSD, Jython +# see __init__.py +# +# (C) 2001-2003 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt + +"""\ +Some tests for the serial module. +Part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net + +Intended to be run on different platforms, to ensure portability of +the code. + +These tests open a serial port and change all the settings on the fly. +If the port is really correctly configured cannot be determined - that +would require external hardware or a null modem cable and an other +serial port library... Thus it mainly tests that all features are +correctly implemented and that the interface does what it should. + +""" + +import unittest +import serial + +# on which port should the tests be performed: +PORT = 0 + +class Test_ChangeAttributes(unittest.TestCase): + """Test with timeouts""" + + def setUp(self): + # create a closed serial port + self.s = serial.serial_for_url(PORT, do_not_open=True) + + def tearDown(self): + self.s.close() + + def test_PortSetting(self): + self.s.port = PORT + # portstr has to be set + if isinstance(PORT, str): + self.failUnlessEqual(self.s.portstr.lower(), PORT.lower()) + else: + self.failUnlessEqual(self.s.portstr, serial.device(PORT)) + # test internals + self.failUnlessEqual(self.s._port, PORT) + # test on the fly change + self.s.open() + self.failUnless(self.s.isOpen()) + try: + self.s.port = 0 + except serial.SerialException: # port not available on system + pass # can't test on this machine... + else: + self.failUnless(self.s.isOpen()) + self.failUnlessEqual(self.s.port, 0) + self.failUnlessEqual(self.s.portstr, serial.device(0)) + try: + self.s.port = 1 + except serial.SerialException: # port not available on system + pass # can't test on this machine... + else: + self.failUnless(self.s.isOpen()) + self.failUnlessEqual(self.s.port, 1) + self.failUnlessEqual(self.s.portstr, serial.device(1)) + + def test_DoubleOpen(self): + self.s.port = PORT + self.s.open() + # calling open for a second time is an error + self.failUnlessRaises(serial.SerialException, self.s.open) + + + def test_BaudrateSetting(self): + self.s.port = PORT + self.s.open() + for baudrate in (300, 9600, 19200, 115200): + self.s.baudrate = baudrate + # test get method + self.failUnlessEqual(self.s.baudrate, baudrate) + # test internals + self.failUnlessEqual(self.s._baudrate, baudrate) + # test illegal values + for illegal_value in (-300, -1, 'a', None): + self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value) + + # skip this test as pyserial now tries to set even non standard baud rates. + # therefore the test can not choose a value that fails on any system. + def disabled_test_BaudrateSetting2(self): + # test illegal values, depending on machine/port some of these may be valid... + self.s.port = PORT + self.s.open() + for illegal_value in (500000, 576000, 921600, 92160): + self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value) + + def test_BytesizeSetting(self): + for bytesize in (5,6,7,8): + self.s.bytesize = bytesize + # test get method + self.failUnlessEqual(self.s.bytesize, bytesize) + # test internals + self.failUnlessEqual(self.s._bytesize, bytesize) + # test illegal values + for illegal_value in (0, 1, 3, 4, 9, 10, 'a', None): + self.failUnlessRaises(ValueError, self.s.setByteSize, illegal_value) + + def test_ParitySetting(self): + for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD): + self.s.parity = parity + # test get method + self.failUnlessEqual(self.s.parity, parity) + # test internals + self.failUnlessEqual(self.s._parity, parity) + # test illegal values + for illegal_value in (0, 57, 'a', None): + self.failUnlessRaises(ValueError, self.s.setParity, illegal_value) + + def test_StopbitsSetting(self): + for stopbits in (1, 2): + self.s.stopbits = stopbits + # test get method + self.failUnlessEqual(self.s.stopbits, stopbits) + # test internals + self.failUnlessEqual(self.s._stopbits, stopbits) + # test illegal values + for illegal_value in (0, 3, 2.5, 57, 'a', None): + self.failUnlessRaises(ValueError, self.s.setStopbits, illegal_value) + + def test_TimeoutSetting(self): + for timeout in (None, 0, 1, 3.14159, 10, 1000, 3600): + self.s.timeout = timeout + # test get method + self.failUnlessEqual(self.s.timeout, timeout) + # test internals + self.failUnlessEqual(self.s._timeout, timeout) + # test illegal values + for illegal_value in (-1, 'a'): + self.failUnlessRaises(ValueError, self.s.setTimeout, illegal_value) + + def test_XonXoffSetting(self): + for xonxoff in (True, False): + self.s.xonxoff = xonxoff + # test get method + self.failUnlessEqual(self.s.xonxoff, xonxoff) + # test internals + self.failUnlessEqual(self.s._xonxoff, xonxoff) + # no illegal values here, normal rules for the boolean value of an + # object are used thus all objects have a truth value. + + def test_RtsCtsSetting(self): + for rtscts in (True, False): + self.s.rtscts = rtscts + # test get method + self.failUnlessEqual(self.s.rtscts, rtscts) + # test internals + self.failUnlessEqual(self.s._rtscts, rtscts) + # no illegal values here, normal rules for the boolean value of an + # object are used thus all objects have a truth value. + + # this test does not work anymore since serial_for_url that is used + # now, already sets a port + def disabled_test_UnconfiguredPort(self): + # an unconfigured port cannot be opened + self.failUnlessRaises(serial.SerialException, self.s.open) + + def test_PortOpenClose(self): + self.s.port = PORT + for i in range(3): + # open the port and check flag + self.failUnless(not self.s.isOpen()) + self.s.open() + self.failUnless(self.s.isOpen()) + self.s.close() + self.failUnless(not self.s.isOpen()) + + +if __name__ == '__main__': + import sys + sys.stdout.write(__doc__) + if len(sys.argv) > 1: + PORT = sys.argv[1] + sys.stdout.write("Testing port: %r\n" % PORT) + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/test/test_high_load.py b/test/test_high_load.py new file mode 100644 index 0000000..87f5ce0 --- /dev/null +++ b/test/test_high_load.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +#Python Serial Port Extension for Win32, Linux, BSD, Jython +#see __init__.py +# +#(C) 2001-2003 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt + +"""Some tests for the serial module. +Part of pyserial (http://pyserial.sf.net) (C)2002-2003 cliechti@gmx.net + +Intended to be run on different platforms, to ensure portability of +the code. + +For all these tests a simple hardware is required. +Loopback HW adapter: +Shortcut these pin pairs: + TX <-> RX + RTS <-> CTS + DTR <-> DSR + +On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) +""" + +import unittest +import sys +import serial + +# on which port should the tests be performed: +PORT = 0 +BAUDRATE = 115200 +#~ BAUDRATE=9600 + +if sys.version_info >= (3, 0): + bytes_0to255 = bytes(range(256)) +else: + bytes_0to255 = ''.join([chr(x) for x in range(256)]) + + +class TestHighLoad(unittest.TestCase): + """Test sending and receiving large amount of data""" + + N = 16 + #~ N = 1 + + def setUp(self): + self.s = serial.serial_for_url(PORT, BAUDRATE, timeout=10) + + def tearDown(self): + self.s.close() + + def test0_WriteReadLoopback(self): + """Send big strings, write/read order.""" + for i in range(self.N): + q = bytes_0to255 + self.s.write(q) + self.failUnlessEqual(self.s.read(len(q)), q) # expected same which was written before + self.failUnlessEqual(self.s.inWaiting(), 0) # expected empty buffer after all sent chars are read + + def test1_WriteWriteReadLoopback(self): + """Send big strings, multiple write one read.""" + q = bytes_0to255 + for i in range(self.N): + self.s.write(q) + read = self.s.read(len(q)*self.N) + self.failUnlessEqual(read, q*self.N, "expected what was written before. got %d bytes, expected %d" % (len(read), self.N*len(q))) + self.failUnlessEqual(self.s.inWaiting(), 0) # "expected empty buffer after all sent chars are read") + + +if __name__ == '__main__': + import sys + sys.stdout.write(__doc__) + if len(sys.argv) > 1: + PORT = sys.argv[1] + sys.stdout.write("Testing port: %r\n" % PORT) + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/test/test_iolib.py b/test/test_iolib.py new file mode 100644 index 0000000..8d76e45 --- /dev/null +++ b/test/test_iolib.py @@ -0,0 +1,80 @@ +#! /usr/bin/env python +# +# Python Serial Port Extension for Win32, Linux, BSD, Jython +# see __init__.py +# +# (C) 2001-2009 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt + +"""\ +Some tests for the serial module. +Part of pyserial (http://pyserial.sf.net) (C)2001-2009 cliechti@gmx.net + +Intended to be run on different platforms, to ensure portability of +the code. + +This modules contains test for the interaction between Serial and the io +library. This only works on Python 2.6+ that introduced the io library. + +For all these tests a simple hardware is required. +Loopback HW adapter: +Shortcut these pin pairs: + TX <-> RX + RTS <-> CTS + DTR <-> DSR + +On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) +""" + +import unittest +import sys + +if __name__ == '__main__' and sys.version_info < (2, 6): + sys.stderr.write("""\ +============================================================================== +WARNING: this test is intended for Python 2.6 and newer where the io library +is available. This seems to be an older version of Python running. +Continuing anyway... +============================================================================== +""") + +import io +import serial + +# trick to make that this test run under 2.6 and 3.x without modification. +# problem is, io library on 2.6 does NOT accept type 'str' and 3.x doesn't +# like u'nicode' strings with the prefix and it is not providing an unicode +# function ('str' is now what 'unicode' used to be) +if sys.version_info >= (3, 0): + def unicode(x): return x + + +# on which port should the tests be performed: +PORT = 0 + +class Test_SerialAndIO(unittest.TestCase): + + def setUp(self): + self.s = serial.serial_for_url(PORT, timeout=1) + #~ self.io = io.TextIOWrapper(self.s) + self.io = io.TextIOWrapper(io.BufferedRWPair(self.s, self.s)) + + def tearDown(self): + self.s.close() + + def test_hello_raw(self): + self.io.write(unicode("hello\n")) + self.io.flush() # it is buffering. required to get the data out + hello = self.io.readline() + self.failUnlessEqual(hello, unicode("hello\n")) + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +if __name__ == '__main__': + import sys + sys.stdout.write(__doc__) + if len(sys.argv) > 1: + PORT = sys.argv[1] + sys.stdout.write("Testing port: %r\n" % PORT) + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/test/test_readline.py b/test/test_readline.py new file mode 100644 index 0000000..677ce85 --- /dev/null +++ b/test/test_readline.py @@ -0,0 +1,107 @@ +#! /usr/bin/env python +# Python Serial Port Extension for Win32, Linux, BSD, Jython +# see __init__.py +# +# (C) 2010 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt + +"""\ +Some tests for the serial module. +Part of pyserial (http://pyserial.sf.net) (C)2010 cliechti@gmx.net + +Intended to be run on different platforms, to ensure portability of +the code. + +For all these tests a simple hardware is required. +Loopback HW adapter: +Shortcut these pin pairs: + TX <-> RX + RTS <-> CTS + DTR <-> DSR + +On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) +""" + +import unittest +import threading +import time +import sys +import serial + +#~ print serial.VERSION + +# on which port should the tests be performed: +PORT = 0 + +if sys.version_info >= (3, 0): + def data(string): + return bytes(string, 'latin1') +else: + def data(string): return string + + + +class Test_Readline(unittest.TestCase): + """Test readline function""" + + def setUp(self): + self.s = serial.serial_for_url(PORT, timeout=1) + + def tearDown(self): + self.s.close() + + def test_readline(self): + """Test readline method""" + self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a])) + self.failUnlessEqual(self.s.readline(), serial.to_bytes([0x31, 0x0a])) + self.failUnlessEqual(self.s.readline(), serial.to_bytes([0x32, 0x0a])) + self.failUnlessEqual(self.s.readline(), serial.to_bytes([0x33, 0x0a])) + # this time we will get a timeout + self.failUnlessEqual(self.s.readline(), serial.to_bytes([])) + + def test_readlines(self): + """Test readlines method""" + self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a])) + self.failUnlessEqual( + self.s.readlines(), + [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])] + ) + + def test_xreadlines(self): + """Test xreadlines method (skipped for io based systems)""" + if hasattr(self.s, 'xreadlines'): + self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a])) + self.failUnlessEqual( + list(self.s.xreadlines()), + [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])] + ) + + def test_for_in(self): + """Test for line in s""" + self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a])) + lines = [] + for line in self.s: + lines.append(line) + self.failUnlessEqual( + lines, + [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])] + ) + + def test_alternate_eol(self): + """Test readline with alternative eol settings (skipped for io based systems)""" + if hasattr(self.s, 'xreadlines'): # test if it is our FileLike base class + self.s.write(serial.to_bytes("no\rno\nyes\r\n")) + self.failUnlessEqual( + self.s.readline(eol=serial.to_bytes("\r\n")), + serial.to_bytes("no\rno\nyes\r\n")) + + +if __name__ == '__main__': + import sys + sys.stdout.write(__doc__) + if len(sys.argv) > 1: + PORT = sys.argv[1] + sys.stdout.write("Testing port: %r\n" % PORT) + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/test/test_url.py b/test/test_url.py new file mode 100644 index 0000000..700bdbb --- /dev/null +++ b/test/test_url.py @@ -0,0 +1,54 @@ +#! /usr/bin/env python +# Python Serial Port Extension for Win32, Linux, BSD, Jython +# see __init__.py +# +# (C) 2001-2008 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt + +"""\ +Some tests for the serial module. +Part of pySerial (http://pyserial.sf.net) (C)2001-2011 cliechti@gmx.net + +Intended to be run on different platforms, to ensure portability of +the code. + +Cover some of the aspects of serial_for_url and the extension mechanism. +""" + +import unittest +import time +import sys +import serial + + +class Test_URL(unittest.TestCase): + """Test serial_for_url""" + + def test_loop(self): + """loop interface""" + s = serial.serial_for_url('loop://', do_not_open=True) + + def test_bad_url(self): + """invalid protocol specified""" + self.failUnlessRaises(ValueError, serial.serial_for_url, "imnotknown://") + + def test_custom_url(self): + """custom protocol handlers""" + # it's unknown + self.failUnlessRaises(ValueError, serial.serial_for_url, "test://") + # add search path + serial.protocol_handler_packages.append('handlers') + # now it should work + s = serial.serial_for_url("test://") + # remove our handler again + serial.protocol_handler_packages.remove('handlers') + # so it should not work anymore + self.failUnlessRaises(ValueError, serial.serial_for_url, "test://") + + +if __name__ == '__main__': + import sys + sys.stdout.write(__doc__) + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() |