summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorcliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a>2013-10-17 16:04:58 +0000
committercliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a>2013-10-17 16:04:58 +0000
commit8bec55528827d09937f411e27195ec396993d75c (patch)
treeaae3383121e8513390cfa7362de60348e0e45699 /test
parent5425bb0510ab10b4601139b7fec64a61e2aec543 (diff)
downloadpyserial-git-8bec55528827d09937f411e27195ec396993d75c.tar.gz
Diffstat (limited to 'test')
-rw-r--r--test/handlers/__init__.py0
-rw-r--r--test/handlers/protocol_test.py202
-rw-r--r--test/run_all_tests.py53
-rw-r--r--test/test.py230
-rw-r--r--test/test_advanced.py188
-rw-r--r--test/test_high_load.py77
-rw-r--r--test/test_iolib.py80
-rw-r--r--test/test_readline.py107
-rw-r--r--test/test_url.py54
9 files changed, 991 insertions, 0 deletions
diff --git a/test/handlers/__init__.py b/test/handlers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/handlers/__init__.py
diff --git a/test/handlers/protocol_test.py b/test/handlers/protocol_test.py
new file mode 100644
index 0000000..42ac4b2
--- /dev/null
+++ b/test/handlers/protocol_test.py
@@ -0,0 +1,202 @@
+#! python
+#
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# This module implements a URL dummy handler for serial_for_url.
+#
+# (C) 2011 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+#
+# URL format: test://
+
+from serial.serialutil import *
+import time
+import socket
+import logging
+
+# map log level names to constants. used in fromURL()
+LOGGER_LEVELS = {
+ 'debug': logging.DEBUG,
+ 'info': logging.INFO,
+ 'warning': logging.WARNING,
+ 'error': logging.ERROR,
+ }
+
+class DummySerial(SerialBase):
+ """Serial port implementation for plain sockets."""
+
+ def open(self):
+ """Open port with current settings. This may throw a SerialException
+ if the port cannot be opened."""
+ self.logger = None
+ if self._port is None:
+ raise SerialException("Port must be configured before it can be used.")
+ # not that there anything to configure...
+ self._reconfigurePort()
+ # all things set up get, now a clean start
+ self._isOpen = True
+
+ def _reconfigurePort(self):
+ """Set communication parameters on opened port. for the test://
+ protocol all settings are ignored!"""
+ if self.logger:
+ self.logger.info('ignored port configuration change')
+
+ def close(self):
+ """Close port"""
+ if self._isOpen:
+ self._isOpen = False
+
+ def makeDeviceName(self, port):
+ raise SerialException("there is no sensible way to turn numbers into URLs")
+
+ def fromURL(self, url):
+ """extract host and port from an URL string"""
+ if url.lower().startswith("test://"): url = url[7:]
+ try:
+ # is there a "path" (our options)?
+ if '/' in url:
+ # cut away options
+ url, options = url.split('/', 1)
+ # process options now, directly altering self
+ for option in options.split('/'):
+ if '=' in option:
+ option, value = option.split('=', 1)
+ else:
+ value = None
+ if option == 'logging':
+ logging.basicConfig() # XXX is that good to call it here?
+ self.logger = logging.getLogger('pySerial.test')
+ self.logger.setLevel(LOGGER_LEVELS[value])
+ self.logger.debug('enabled logging')
+ else:
+ raise ValueError('unknown option: %r' % (option,))
+ except ValueError as e:
+ raise SerialException('expected a string in the form "[test://][option[/option...]]": %s' % e)
+ return (host, port)
+
+ # - - - - - - - - - - - - - - - - - - - - - - - -
+
+ def inWaiting(self):
+ """Return the number of characters currently in the input buffer."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ # set this one to debug as the function could be called often...
+ self.logger.debug('WARNING: inWaiting returns dummy value')
+ return 0 # hmmm, see comment in read()
+
+ def read(self, size=1):
+ """Read size bytes from the serial port. If a timeout is set it may
+ return less characters as requested. With no timeout it will block
+ until the requested number of bytes is read."""
+ if not self._isOpen: raise portNotOpenError
+ data = '123' # dummy data
+ return bytes(data)
+
+ def write(self, data):
+ """Output the given string over the serial port. Can block if the
+ connection is blocked. May raise SerialException if the connection is
+ closed."""
+ if not self._isOpen: raise portNotOpenError
+ # nothing done
+ return len(data)
+
+ def flushInput(self):
+ """Clear input buffer, discarding all that is in the buffer."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored flushInput')
+
+ def flushOutput(self):
+ """Clear output buffer, aborting the current output and
+ discarding all that is in the buffer."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored flushOutput')
+
+ def sendBreak(self, duration=0.25):
+ """Send break condition. Timed, returns to idle state after given
+ duration."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored sendBreak(%r)' % (duration,))
+
+ def setBreak(self, level=True):
+ """Set break: Controls TXD. When active, to transmitting is
+ possible."""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored setBreak(%r)' % (level,))
+
+ def setRTS(self, level=True):
+ """Set terminal status line: Request To Send"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored setRTS(%r)' % (level,))
+
+ def setDTR(self, level=True):
+ """Set terminal status line: Data Terminal Ready"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('ignored setDTR(%r)' % (level,))
+
+ def getCTS(self):
+ """Read terminal status line: Clear To Send"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getCTS()')
+ return True
+
+ def getDSR(self):
+ """Read terminal status line: Data Set Ready"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getDSR()')
+ return True
+
+ def getRI(self):
+ """Read terminal status line: Ring Indicator"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getRI()')
+ return False
+
+ def getCD(self):
+ """Read terminal status line: Carrier Detect"""
+ if not self._isOpen: raise portNotOpenError
+ if self.logger:
+ self.logger.info('returning dummy for getCD()')
+ return True
+
+ # - - - platform specific - - -
+ # None so far
+
+
+# assemble Serial class with the platform specific implementation and the base
+# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
+# library, derive from io.RawIOBase
+try:
+ import io
+except ImportError:
+ # classic version with our own file-like emulation
+ class Serial(DummySerial, FileLike):
+ pass
+else:
+ # io library present
+ class Serial(DummySerial, io.RawIOBase):
+ pass
+
+
+# simple client test
+if __name__ == '__main__':
+ import sys
+ s = Serial('test://logging=debug')
+ sys.stdout.write('%s\n' % s)
+
+ sys.stdout.write("write...\n")
+ s.write("hello\n")
+ s.flush()
+ sys.stdout.write("read: %s\n" % s.read(5))
+
+ s.close()
diff --git a/test/run_all_tests.py b/test/run_all_tests.py
new file mode 100644
index 0000000..e7f115d
--- /dev/null
+++ b/test/run_all_tests.py
@@ -0,0 +1,53 @@
+#! /usr/bin/env python
+
+"""\
+UnitTest runner. This one searches for all files named test_*.py and collects
+all test cases from these files. Finally it runs all tests and prints a
+summary.
+"""
+
+import unittest
+import sys
+import os
+import time
+
+# inject local copy to avoid testing the installed version instead of the
+# working copy (only for 2.x as the sources would need to be translated with
+# 2to3 for Python 3, use installed module instead for Python 3).
+if sys.version_info < (3, 0):
+ sys.path.insert(0, '..')
+
+import serial
+print("Patching sys.path to test local version. Testing Version: %s" % (serial.VERSION,))
+
+PORT = 'loop://'
+if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+
+# find files and the tests in them
+mainsuite = unittest.TestSuite()
+for modulename in [os.path.splitext(x)[0]
+ for x in os.listdir('.')
+ if x != __file__ and x.startswith("test") and x.endswith(".py")
+]:
+ try:
+ module = __import__(modulename)
+ except ImportError:
+ print("skipping %s" % (modulename,))
+ else:
+ module.PORT = PORT
+ testsuite = unittest.findTestCases(module)
+ print("found %s tests in %r" % (testsuite.countTestCases(), modulename))
+ mainsuite.addTest(testsuite)
+
+verbosity = 1
+if '-v' in sys.argv[1:]:
+ verbosity = 2
+
+# run the collected tests
+testRunner = unittest.TextTestRunner(verbosity=verbosity)
+#~ testRunner = unittest.ConsoleTestRunner(verbosity=verbosity)
+result = testRunner.run(mainsuite)
+
+# set exit code accordingly to test results
+sys.exit(not result.wasSuccessful())
diff --git a/test/test.py b/test/test.py
new file mode 100644
index 0000000..25a34c3
--- /dev/null
+++ b/test/test.py
@@ -0,0 +1,230 @@
+#! /usr/bin/env python
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# (C) 2001-2008 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""\
+Some tests for the serial module.
+Part of pyserial (http://pyserial.sf.net) (C)2001-2009 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+For all these tests a simple hardware is required.
+Loopback HW adapter:
+Shortcut these pin pairs:
+ TX <-> RX
+ RTS <-> CTS
+ DTR <-> DSR
+
+On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
+"""
+
+import unittest
+import threading
+import time
+import sys
+import serial
+
+# on which port should the tests be performed:
+PORT = 0
+
+if sys.version_info >= (3, 0):
+ def data(string):
+ return bytes(string, 'latin1')
+ bytes_0to255 = bytes(range(256))
+else:
+ def data(string): return string
+ bytes_0to255 = ''.join([chr(x) for x in range(256)])
+
+
+def segments(data, size=16):
+ for a in range(0, len(data), size):
+ yield data[a:a+size]
+
+
+class Test4_Nonblocking(unittest.TestCase):
+ """Test with timeouts"""
+ timeout = 0
+
+ def setUp(self):
+ self.s = serial.serial_for_url(PORT, timeout=self.timeout)
+
+ def tearDown(self):
+ self.s.close()
+
+ def test0_Messy(self):
+ """NonBlocking (timeout=0)"""
+ # this is only here to write out the message in verbose mode
+ # because Test3 and Test4 print the same messages
+
+ def test1_ReadEmpty(self):
+ """timeout: After port open, the input buffer must be empty"""
+ self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer")
+
+ def test2_Loopback(self):
+ """timeout: each sent character should return (binary test).
+ this is also a test for the binary capability of a port."""
+ for block in segments(bytes_0to255):
+ length = len(block)
+ self.s.write(block)
+ # there might be a small delay until the character is ready (especially on win32)
+ time.sleep(0.05)
+ self.failUnlessEqual(self.s.inWaiting(), length, "expected exactly %d character for inWainting()" % length)
+ self.failUnlessEqual(self.s.read(length), block)#, "expected a %r which was written before" % block)
+ self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read")
+
+ def test2_LoopbackTimeout(self):
+ """timeout: test the timeout/immediate return.
+ partial results should be returned."""
+ self.s.write(data("HELLO"))
+ time.sleep(0.1) # there might be a small delay until the character is ready (especially on win32 and rfc2217)
+ # read more characters as are available to run in the timeout
+ self.failUnlessEqual(self.s.read(10), data('HELLO'), "expected the 'HELLO' which was written before")
+ self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read")
+
+
+class Test3_Timeout(Test4_Nonblocking):
+ """Same tests as the NonBlocking ones but this time with timeout"""
+ timeout = 1
+
+ def test0_Messy(self):
+ """Blocking (timeout=1)"""
+ # this is only here to write out the message in verbose mode
+ # because Test3 and Test4 print the same messages
+
+
+class SendEvent(threading.Thread):
+ def __init__(self, serial, delay=3):
+ threading.Thread.__init__(self)
+ self.serial = serial
+ self.delay = delay
+ self.x = threading.Event()
+ self.stopped = 0
+ self.start()
+
+ def run(self):
+ time.sleep(self.delay)
+ self.x.set()
+ if not self.stopped:
+ self.serial.write(data("E"))
+ self.serial.flush()
+
+ def isSet(self):
+ return self.x.isSet()
+
+ def stop(self):
+ self.stopped = 1
+ self.x.wait()
+
+class Test1_Forever(unittest.TestCase):
+ """Tests a port with no timeout. These tests require that a
+ character is sent after some time to stop the test, this is done
+ through the SendEvent class and the Loopback HW."""
+ def setUp(self):
+ self.s = serial.serial_for_url(PORT, timeout=None)
+ self.event = SendEvent(self.s)
+
+ def tearDown(self):
+ self.event.stop()
+ self.s.close()
+
+ def test2_ReadEmpty(self):
+ """no timeout: after port open, the input buffer must be empty (read).
+ a character is sent after some time to terminate the test (SendEvent)."""
+ c = self.s.read(1)
+ if not (self.event.isSet() and c == data('E')):
+ self.fail("expected marker (evt=%r, c=%r)" % (self.event.isSet(), c))
+
+
+class Test2_Forever(unittest.TestCase):
+ """Tests a port with no timeout"""
+ def setUp(self):
+ self.s = serial.serial_for_url(PORT, timeout=None)
+
+ def tearDown(self):
+ self.s.close()
+
+ def test1_inWaitingEmpty(self):
+ """no timeout: after port open, the input buffer must be empty (inWaiting)"""
+ self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer")
+
+ def test2_Loopback(self):
+ """no timeout: each sent character should return (binary test).
+ this is also a test for the binary capability of a port."""
+ for block in segments(bytes_0to255):
+ length = len(block)
+ self.s.write(block)
+ # there might be a small delay until the character is ready (especially on win32 and rfc2217)
+ time.sleep(0.05)
+ self.failUnlessEqual(self.s.inWaiting(), length)#, "expected exactly %d character for inWainting()" % length)
+ self.failUnlessEqual(self.s.read(length), block) #, "expected %r which was written before" % block)
+ self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer after all sent chars are read")
+
+
+class Test0_DataWires(unittest.TestCase):
+ """Test modem control lines"""
+ def setUp(self):
+ self.s = serial.serial_for_url(PORT)
+
+ def tearDown(self):
+ self.s.close()
+
+ def test1_RTS(self):
+ """Test RTS/CTS"""
+ self.s.setRTS(0)
+ time.sleep(1.1)
+ self.failUnless(not self.s.getCTS(), "CTS -> 0")
+ self.s.setRTS(1)
+ time.sleep(1.1)
+ self.failUnless(self.s.getCTS(), "CTS -> 1")
+
+ def test2_DTR(self):
+ """Test DTR/DSR"""
+ self.s.setDTR(0)
+ time.sleep(1.1)
+ self.failUnless(not self.s.getDSR(), "DSR -> 0")
+ self.s.setDTR(1)
+ time.sleep(1.1)
+ self.failUnless(self.s.getDSR(), "DSR -> 1")
+
+ def test3_RI(self):
+ """Test RI"""
+ self.failUnless(not self.s.getRI(), "RI -> 0")
+
+
+class Test_MoreTimeouts(unittest.TestCase):
+ """Test with timeouts"""
+ def setUp(self):
+ # create an closed serial port
+ self.s = serial.serial_for_url(PORT, do_not_open=True)
+
+ def tearDown(self):
+ self.s.close()
+
+ def test_WriteTimeout(self):
+ """Test write() timeout."""
+ # use xonxoff setting and the loop-back adapter to switch traffic on hold
+ self.s.port = PORT
+ self.s.writeTimeout = 1
+ self.s.xonxoff = 1
+ self.s.open()
+ self.s.write(serial.XOFF)
+ time.sleep(0.5) # some systems need a little delay so that they can react on XOFF
+ t1 = time.time()
+ self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, data("timeout please"*200))
+ t2 = time.time()
+ self.failUnless( 0.9 <= (t2-t1) < 2.1, "Timeout not in the given interval (%s)" % (t2-t1))
+
+
+if __name__ == '__main__':
+ import sys
+ sys.stdout.write(__doc__)
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/test/test_advanced.py b/test/test_advanced.py
new file mode 100644
index 0000000..0d85e2a
--- /dev/null
+++ b/test/test_advanced.py
@@ -0,0 +1,188 @@
+#!/usr/bin/env python
+# needs at least python 2.2.3
+
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# (C) 2001-2003 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""\
+Some tests for the serial module.
+Part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+These tests open a serial port and change all the settings on the fly.
+If the port is really correctly configured cannot be determined - that
+would require external hardware or a null modem cable and an other
+serial port library... Thus it mainly tests that all features are
+correctly implemented and that the interface does what it should.
+
+"""
+
+import unittest
+import serial
+
+# on which port should the tests be performed:
+PORT = 0
+
+class Test_ChangeAttributes(unittest.TestCase):
+ """Test with timeouts"""
+
+ def setUp(self):
+ # create a closed serial port
+ self.s = serial.serial_for_url(PORT, do_not_open=True)
+
+ def tearDown(self):
+ self.s.close()
+
+ def test_PortSetting(self):
+ self.s.port = PORT
+ # portstr has to be set
+ if isinstance(PORT, str):
+ self.failUnlessEqual(self.s.portstr.lower(), PORT.lower())
+ else:
+ self.failUnlessEqual(self.s.portstr, serial.device(PORT))
+ # test internals
+ self.failUnlessEqual(self.s._port, PORT)
+ # test on the fly change
+ self.s.open()
+ self.failUnless(self.s.isOpen())
+ try:
+ self.s.port = 0
+ except serial.SerialException: # port not available on system
+ pass # can't test on this machine...
+ else:
+ self.failUnless(self.s.isOpen())
+ self.failUnlessEqual(self.s.port, 0)
+ self.failUnlessEqual(self.s.portstr, serial.device(0))
+ try:
+ self.s.port = 1
+ except serial.SerialException: # port not available on system
+ pass # can't test on this machine...
+ else:
+ self.failUnless(self.s.isOpen())
+ self.failUnlessEqual(self.s.port, 1)
+ self.failUnlessEqual(self.s.portstr, serial.device(1))
+
+ def test_DoubleOpen(self):
+ self.s.port = PORT
+ self.s.open()
+ # calling open for a second time is an error
+ self.failUnlessRaises(serial.SerialException, self.s.open)
+
+
+ def test_BaudrateSetting(self):
+ self.s.port = PORT
+ self.s.open()
+ for baudrate in (300, 9600, 19200, 115200):
+ self.s.baudrate = baudrate
+ # test get method
+ self.failUnlessEqual(self.s.baudrate, baudrate)
+ # test internals
+ self.failUnlessEqual(self.s._baudrate, baudrate)
+ # test illegal values
+ for illegal_value in (-300, -1, 'a', None):
+ self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value)
+
+ # skip this test as pyserial now tries to set even non standard baud rates.
+ # therefore the test can not choose a value that fails on any system.
+ def disabled_test_BaudrateSetting2(self):
+ # test illegal values, depending on machine/port some of these may be valid...
+ self.s.port = PORT
+ self.s.open()
+ for illegal_value in (500000, 576000, 921600, 92160):
+ self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value)
+
+ def test_BytesizeSetting(self):
+ for bytesize in (5,6,7,8):
+ self.s.bytesize = bytesize
+ # test get method
+ self.failUnlessEqual(self.s.bytesize, bytesize)
+ # test internals
+ self.failUnlessEqual(self.s._bytesize, bytesize)
+ # test illegal values
+ for illegal_value in (0, 1, 3, 4, 9, 10, 'a', None):
+ self.failUnlessRaises(ValueError, self.s.setByteSize, illegal_value)
+
+ def test_ParitySetting(self):
+ for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD):
+ self.s.parity = parity
+ # test get method
+ self.failUnlessEqual(self.s.parity, parity)
+ # test internals
+ self.failUnlessEqual(self.s._parity, parity)
+ # test illegal values
+ for illegal_value in (0, 57, 'a', None):
+ self.failUnlessRaises(ValueError, self.s.setParity, illegal_value)
+
+ def test_StopbitsSetting(self):
+ for stopbits in (1, 2):
+ self.s.stopbits = stopbits
+ # test get method
+ self.failUnlessEqual(self.s.stopbits, stopbits)
+ # test internals
+ self.failUnlessEqual(self.s._stopbits, stopbits)
+ # test illegal values
+ for illegal_value in (0, 3, 2.5, 57, 'a', None):
+ self.failUnlessRaises(ValueError, self.s.setStopbits, illegal_value)
+
+ def test_TimeoutSetting(self):
+ for timeout in (None, 0, 1, 3.14159, 10, 1000, 3600):
+ self.s.timeout = timeout
+ # test get method
+ self.failUnlessEqual(self.s.timeout, timeout)
+ # test internals
+ self.failUnlessEqual(self.s._timeout, timeout)
+ # test illegal values
+ for illegal_value in (-1, 'a'):
+ self.failUnlessRaises(ValueError, self.s.setTimeout, illegal_value)
+
+ def test_XonXoffSetting(self):
+ for xonxoff in (True, False):
+ self.s.xonxoff = xonxoff
+ # test get method
+ self.failUnlessEqual(self.s.xonxoff, xonxoff)
+ # test internals
+ self.failUnlessEqual(self.s._xonxoff, xonxoff)
+ # no illegal values here, normal rules for the boolean value of an
+ # object are used thus all objects have a truth value.
+
+ def test_RtsCtsSetting(self):
+ for rtscts in (True, False):
+ self.s.rtscts = rtscts
+ # test get method
+ self.failUnlessEqual(self.s.rtscts, rtscts)
+ # test internals
+ self.failUnlessEqual(self.s._rtscts, rtscts)
+ # no illegal values here, normal rules for the boolean value of an
+ # object are used thus all objects have a truth value.
+
+ # this test does not work anymore since serial_for_url that is used
+ # now, already sets a port
+ def disabled_test_UnconfiguredPort(self):
+ # an unconfigured port cannot be opened
+ self.failUnlessRaises(serial.SerialException, self.s.open)
+
+ def test_PortOpenClose(self):
+ self.s.port = PORT
+ for i in range(3):
+ # open the port and check flag
+ self.failUnless(not self.s.isOpen())
+ self.s.open()
+ self.failUnless(self.s.isOpen())
+ self.s.close()
+ self.failUnless(not self.s.isOpen())
+
+
+if __name__ == '__main__':
+ import sys
+ sys.stdout.write(__doc__)
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/test/test_high_load.py b/test/test_high_load.py
new file mode 100644
index 0000000..87f5ce0
--- /dev/null
+++ b/test/test_high_load.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+#Python Serial Port Extension for Win32, Linux, BSD, Jython
+#see __init__.py
+#
+#(C) 2001-2003 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""Some tests for the serial module.
+Part of pyserial (http://pyserial.sf.net) (C)2002-2003 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+For all these tests a simple hardware is required.
+Loopback HW adapter:
+Shortcut these pin pairs:
+ TX <-> RX
+ RTS <-> CTS
+ DTR <-> DSR
+
+On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
+"""
+
+import unittest
+import sys
+import serial
+
+# on which port should the tests be performed:
+PORT = 0
+BAUDRATE = 115200
+#~ BAUDRATE=9600
+
+if sys.version_info >= (3, 0):
+ bytes_0to255 = bytes(range(256))
+else:
+ bytes_0to255 = ''.join([chr(x) for x in range(256)])
+
+
+class TestHighLoad(unittest.TestCase):
+ """Test sending and receiving large amount of data"""
+
+ N = 16
+ #~ N = 1
+
+ def setUp(self):
+ self.s = serial.serial_for_url(PORT, BAUDRATE, timeout=10)
+
+ def tearDown(self):
+ self.s.close()
+
+ def test0_WriteReadLoopback(self):
+ """Send big strings, write/read order."""
+ for i in range(self.N):
+ q = bytes_0to255
+ self.s.write(q)
+ self.failUnlessEqual(self.s.read(len(q)), q) # expected same which was written before
+ self.failUnlessEqual(self.s.inWaiting(), 0) # expected empty buffer after all sent chars are read
+
+ def test1_WriteWriteReadLoopback(self):
+ """Send big strings, multiple write one read."""
+ q = bytes_0to255
+ for i in range(self.N):
+ self.s.write(q)
+ read = self.s.read(len(q)*self.N)
+ self.failUnlessEqual(read, q*self.N, "expected what was written before. got %d bytes, expected %d" % (len(read), self.N*len(q)))
+ self.failUnlessEqual(self.s.inWaiting(), 0) # "expected empty buffer after all sent chars are read")
+
+
+if __name__ == '__main__':
+ import sys
+ sys.stdout.write(__doc__)
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/test/test_iolib.py b/test/test_iolib.py
new file mode 100644
index 0000000..8d76e45
--- /dev/null
+++ b/test/test_iolib.py
@@ -0,0 +1,80 @@
+#! /usr/bin/env python
+#
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# (C) 2001-2009 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""\
+Some tests for the serial module.
+Part of pyserial (http://pyserial.sf.net) (C)2001-2009 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+This modules contains test for the interaction between Serial and the io
+library. This only works on Python 2.6+ that introduced the io library.
+
+For all these tests a simple hardware is required.
+Loopback HW adapter:
+Shortcut these pin pairs:
+ TX <-> RX
+ RTS <-> CTS
+ DTR <-> DSR
+
+On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
+"""
+
+import unittest
+import sys
+
+if __name__ == '__main__' and sys.version_info < (2, 6):
+ sys.stderr.write("""\
+==============================================================================
+WARNING: this test is intended for Python 2.6 and newer where the io library
+is available. This seems to be an older version of Python running.
+Continuing anyway...
+==============================================================================
+""")
+
+import io
+import serial
+
+# trick to make that this test run under 2.6 and 3.x without modification.
+# problem is, io library on 2.6 does NOT accept type 'str' and 3.x doesn't
+# like u'nicode' strings with the prefix and it is not providing an unicode
+# function ('str' is now what 'unicode' used to be)
+if sys.version_info >= (3, 0):
+ def unicode(x): return x
+
+
+# on which port should the tests be performed:
+PORT = 0
+
+class Test_SerialAndIO(unittest.TestCase):
+
+ def setUp(self):
+ self.s = serial.serial_for_url(PORT, timeout=1)
+ #~ self.io = io.TextIOWrapper(self.s)
+ self.io = io.TextIOWrapper(io.BufferedRWPair(self.s, self.s))
+
+ def tearDown(self):
+ self.s.close()
+
+ def test_hello_raw(self):
+ self.io.write(unicode("hello\n"))
+ self.io.flush() # it is buffering. required to get the data out
+ hello = self.io.readline()
+ self.failUnlessEqual(hello, unicode("hello\n"))
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+if __name__ == '__main__':
+ import sys
+ sys.stdout.write(__doc__)
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/test/test_readline.py b/test/test_readline.py
new file mode 100644
index 0000000..677ce85
--- /dev/null
+++ b/test/test_readline.py
@@ -0,0 +1,107 @@
+#! /usr/bin/env python
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# (C) 2010 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""\
+Some tests for the serial module.
+Part of pyserial (http://pyserial.sf.net) (C)2010 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+For all these tests a simple hardware is required.
+Loopback HW adapter:
+Shortcut these pin pairs:
+ TX <-> RX
+ RTS <-> CTS
+ DTR <-> DSR
+
+On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
+"""
+
+import unittest
+import threading
+import time
+import sys
+import serial
+
+#~ print serial.VERSION
+
+# on which port should the tests be performed:
+PORT = 0
+
+if sys.version_info >= (3, 0):
+ def data(string):
+ return bytes(string, 'latin1')
+else:
+ def data(string): return string
+
+
+
+class Test_Readline(unittest.TestCase):
+ """Test readline function"""
+
+ def setUp(self):
+ self.s = serial.serial_for_url(PORT, timeout=1)
+
+ def tearDown(self):
+ self.s.close()
+
+ def test_readline(self):
+ """Test readline method"""
+ self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
+ self.failUnlessEqual(self.s.readline(), serial.to_bytes([0x31, 0x0a]))
+ self.failUnlessEqual(self.s.readline(), serial.to_bytes([0x32, 0x0a]))
+ self.failUnlessEqual(self.s.readline(), serial.to_bytes([0x33, 0x0a]))
+ # this time we will get a timeout
+ self.failUnlessEqual(self.s.readline(), serial.to_bytes([]))
+
+ def test_readlines(self):
+ """Test readlines method"""
+ self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
+ self.failUnlessEqual(
+ self.s.readlines(),
+ [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])]
+ )
+
+ def test_xreadlines(self):
+ """Test xreadlines method (skipped for io based systems)"""
+ if hasattr(self.s, 'xreadlines'):
+ self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
+ self.failUnlessEqual(
+ list(self.s.xreadlines()),
+ [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])]
+ )
+
+ def test_for_in(self):
+ """Test for line in s"""
+ self.s.write(serial.to_bytes([0x31, 0x0a, 0x32, 0x0a, 0x33, 0x0a]))
+ lines = []
+ for line in self.s:
+ lines.append(line)
+ self.failUnlessEqual(
+ lines,
+ [serial.to_bytes([0x31, 0x0a]), serial.to_bytes([0x32, 0x0a]), serial.to_bytes([0x33, 0x0a])]
+ )
+
+ def test_alternate_eol(self):
+ """Test readline with alternative eol settings (skipped for io based systems)"""
+ if hasattr(self.s, 'xreadlines'): # test if it is our FileLike base class
+ self.s.write(serial.to_bytes("no\rno\nyes\r\n"))
+ self.failUnlessEqual(
+ self.s.readline(eol=serial.to_bytes("\r\n")),
+ serial.to_bytes("no\rno\nyes\r\n"))
+
+
+if __name__ == '__main__':
+ import sys
+ sys.stdout.write(__doc__)
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/test/test_url.py b/test/test_url.py
new file mode 100644
index 0000000..700bdbb
--- /dev/null
+++ b/test/test_url.py
@@ -0,0 +1,54 @@
+#! /usr/bin/env python
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# (C) 2001-2008 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""\
+Some tests for the serial module.
+Part of pySerial (http://pyserial.sf.net) (C)2001-2011 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+Cover some of the aspects of serial_for_url and the extension mechanism.
+"""
+
+import unittest
+import time
+import sys
+import serial
+
+
+class Test_URL(unittest.TestCase):
+ """Test serial_for_url"""
+
+ def test_loop(self):
+ """loop interface"""
+ s = serial.serial_for_url('loop://', do_not_open=True)
+
+ def test_bad_url(self):
+ """invalid protocol specified"""
+ self.failUnlessRaises(ValueError, serial.serial_for_url, "imnotknown://")
+
+ def test_custom_url(self):
+ """custom protocol handlers"""
+ # it's unknown
+ self.failUnlessRaises(ValueError, serial.serial_for_url, "test://")
+ # add search path
+ serial.protocol_handler_packages.append('handlers')
+ # now it should work
+ s = serial.serial_for_url("test://")
+ # remove our handler again
+ serial.protocol_handler_packages.remove('handlers')
+ # so it should not work anymore
+ self.failUnlessRaises(ValueError, serial.serial_for_url, "test://")
+
+
+if __name__ == '__main__':
+ import sys
+ sys.stdout.write(__doc__)
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()