diff options
37 files changed, 689 insertions, 305 deletions
diff --git a/.travis.yml b/.travis.yml index 0b47c2d..80d630e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,4 +13,4 @@ python: script: - python setup.py install - - python test/test.py loop:// + - python test/run_all_tests.py loop:// diff --git a/CHANGES.rst b/CHANGES.rst index 9166c18..e960e61 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -649,8 +649,28 @@ Bugfixes (win32): Version 3.x.x 2016-xx-xx -------------------------- +Improvements: + +- add client mode to exmaple tcp_serial_redirect.py +- use of monotonic clock for timeouts, when available (Python 3.3 and up) +- [#169] arbitrary baudrate support for BSD family +- improve tests, improve ``loop://`` + +Bugfixes: + +- [#137] Exception while cancel in miniterm (python3) +- [#143] Class Serial in protocol_loop.py references variable before assigning + to it +- [#149] Python 3 fix for threaded.FramedPacket + Bugfixes (posix): - [#133] _update_dtr_state throws Inappropriate ioctl for virtual serial port created by socat on OS X +- [#157] Broken handling of CMSPAR in serialposix.py + +Bugfixes (win32): +- [#144] Use Unicode API for list_ports +- [#145] list_ports_windows: support devices with only VID +- [#162] Write in non-blocking mode returns incorrect value on windows diff --git a/documentation/appendix.rst b/documentation/appendix.rst index 5d8bae0..8bc2c1a 100644 --- a/documentation/appendix.rst +++ b/documentation/appendix.rst @@ -5,10 +5,18 @@ How To ====== -Enable :rfc:`2217` in programs using pySerial. - Patch the code where the :class:`serial.Serial` is instantiated. Replace +Enable :rfc:`2217` (and other URL handlers) in programs using pySerial. + Patch the code where the :class:`serial.Serial` is instantiated. + E.g. replace:: + + s = serial.Serial(...) + it with:: + s = serial.serial_for_url(...) + + or for backwards compatibility to old pySerial installations:: + try: s = serial.serial_for_url(...) except AttributeError: @@ -33,6 +41,10 @@ Test your setup. on the screen, then at least RX and TX work (they still could be swapped though). + There is also a ``spy:://`` URL handler. It prints all calls (read/write, + control lines) to the serial port to a file or stderr. See :ref:`spy` + for details. + FAQ === @@ -71,6 +83,14 @@ User supplied URL handlers search path in :data:`serial.protocol_handler_packages`. This is possible starting from pySerial V2.6. +``Permission denied`` errors + On POSIX based systems, the user usually needs to be in a special group to + have access to serial ports. + + On Debian based systems, serial ports are usually in the group ``dialout``, + so running ``sudo adduser $USER dialout`` (and logging-out and -in) enables + the user to access the port. + Related software ================ diff --git a/documentation/pyserial.rst b/documentation/pyserial.rst index 2a7fe42..602134d 100644 --- a/documentation/pyserial.rst +++ b/documentation/pyserial.rst @@ -46,54 +46,58 @@ Features Requirements ============ -- Python 2.7 or newer, including Python 3.4 and newer -- "Java Communications" (JavaComm) or compatible extension for Java/Jython +- Python 2.7 or Python 3.4 and newer + +- If running on Windows: Something newer than WinXP + +- If running on Jython: "Java Communications" (JavaComm) or compatible + extension for Java + +For older installations (older Python versions or older operating systems), see +`older versions`_ below. Installation ============ -pyserial --------- This installs a package that can be used from Python (``import serial``). To install for all users on the system, administrator rights (root) may be required. From PyPI -~~~~~~~~~ -pySerial can be installed from PyPI, either manually downloading the -files and installing as described below or using:: +--------- +pySerial can be installed from PyPI:: - pip install pyserial + python -m pip install pyserial -or:: +Using the `python`/`python3` executable of the desired version (2.7/3.x). - easy_install -U pyserial +Developers also may be interested to get the source archive, because it +contains examples, tests and the this documentation. -From source (tar.gz or checkout) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Download the archive from http://pypi.python.org/pypi/pyserial. +From source (zip/tar.gz or checkout) +------------------------------------ +Download the archive from http://pypi.python.org/pypi/pyserial or +https://github.com/pyserial/pyserial/releases. Unpack the archive, enter the ``pyserial-x.y`` directory and run:: python setup.py install -For Python 3.x:: - - python3 setup.py install +Using the `python`/`python3` executable of the desired version (2.7/3.x). Packages -~~~~~~~~ -There are also packaged versions for some Linux distributions and Windows: +-------- +There are also packaged versions for some Linux distributions: -Debian/Ubuntu - A package is available under the name "python-serial". Note that some - distributions may package an older version of pySerial. +- Debian/Ubuntu: "python-serial", "python3-serial" +- Fedora / RHEL / CentOS / EPEL: "pyserial" +- Arch Linux: "python-pyserial" +- Gento: "dev-python/pyserial" -Windows - There is also a Windows installer for end users. It is located in the - PyPi_. Developers also may be interested to get the source archive, - because it contains examples, tests and the this documentation. +Note that some distributions may package an older version of pySerial. +These packages are created and maintained by developers working on +these distributions. .. _PyPi: http://pypi.python.org/pypi/pyserial @@ -102,21 +106,25 @@ References ========== * Python: http://www.python.org/ * Jython: http://www.jython.org/ -* Java@IBM: http://www-106.ibm.com/developerworks/java/jdk/ (JavaComm links are - on the download page for the respective platform JDK) -* Java@SUN: http://java.sun.com/products/ * IronPython: http://www.codeplex.com/IronPython -* setuptools: http://peak.telecommunity.com/DevCenter/setuptools Older Versions ============== -Older versions are still available in the old download_ page. pySerial 1.21 -is compatible with Python 2.0 on Windows, Linux and several un*x like systems, -MacOSX and Jython. +Older versions are still available on the current download_ page or the `old +download`_ page. The last version of pySerial's 2.x series was `2.7`_, +compatible with Python 2.3 and newer and partially with early Python 3.x +versions. + +pySerial `1.21`_ is compatible with Python 2.0 on Windows, Linux and several +un*x like systems, MacOSX and Jython. + +On Windows, releases older than 2.5 will depend on pywin32_ (previously known as +win32all). WinXP is supported up to 3.0.1. -On Windows releases older than 2.5 will depend on pywin32_ (previously known as -win32all) -.. _download: https://pypi.python.org/pypi/pyserial +.. _`old download`: https://sourceforge.net/projects/pyserial/files/pyserial/ +.. _download: https://pypi.python.org/simple/pyserial/ .. _pywin32: http://pypi.python.org/pypi/pywin32 +.. _`2.7`: https://pypi.python.org/pypi/pyserial/2.7 +.. _`1.21`: https://sourceforge.net/projects/pyserial/files/pyserial/1.21/pyserial-1.21.zip/download diff --git a/documentation/pyserial_api.rst b/documentation/pyserial_api.rst index 75e19e7..cec0078 100644 --- a/documentation/pyserial_api.rst +++ b/documentation/pyserial_api.rst @@ -111,7 +111,20 @@ Native ports .. method:: open() - Open port. + Open port. The state of :attr:`rts` and :attr:`dtr` is applied. + + .. note:: + + Some OS and/or drivers may activate RTS and or DTR automatically, + as soon as the port is opened. There may be a glitch on RTS/DTR + when :attr:`rts`` or :attr:`dtr` are set differently from their + default value (``True`` / active). + + .. note:: + + For compatibility reasons, no error is reported when applying + :attr:`rts` or :attr:`dtr` fails on POSIX due to EINVAL (22) or + ENOTTY (25). .. method:: close() @@ -226,7 +239,7 @@ Native ports Set RTS line to specified logic level. It is possible to assign this value before opening the serial port, then the value is applied uppon - :meth:`open`. + :meth:`open` (with restrictions, see :meth:`open`). .. attribute:: dtr @@ -236,7 +249,7 @@ Native ports Set DTR line to specified logic level. It is possible to assign this value before opening the serial port, then the value is applied uppon - :meth:`open`. + :meth:`open` (with restrictions, see :meth:`open`). Read-only attributes: @@ -275,6 +288,10 @@ Native ports Return the state of the CD line + .. attribute:: is_open + + :getter: Get the state of the serial port, whether it's open. + :type: bool New values can be assigned to the following attributes (properties), the port will be reconfigured, even if it's opened at that time: @@ -476,6 +493,34 @@ Native ports .. versionadded:: 2.5 .. versionchanged:: 3.0 renamed from ``applySettingsDict`` + + This class can be used as context manager. The serial port is closed when + the context is left. + + .. method:: __enter__() + + :returns: Serial instance + + Returns the instance that was used in the ``with`` statement. + + Example: + + >>> with serial.serial_for_url(port) as s: + ... s.write(b'hello') + + Here no port argument is given, so it is not opened automatically: + + >>> with serial.Serial() as s: + ... s.port = ... + ... s.open() + ... s.write(b'hello') + + + .. method:: __exit__(exc_type, exc_val, exc_tb) + + Closes serial port. + + Platform specific methods. .. warning:: Programs using the following methods and attributes are not @@ -564,6 +609,10 @@ Native ports .. deprecated:: 3.0 see :attr:`in_waiting` + .. method:: isOpen() + + .. deprecated:: 3.0 see :attr:`is_open` + .. attribute:: writeTimeout .. deprecated:: 3.0 see :attr:`write_timeout` @@ -659,12 +708,14 @@ enable RS485 specific support on some platforms. Currently Windows and Linux Usage:: + import serial + import serial.rs485 ser = serial.Serial(...) ser.rs485_mode = serial.rs485.RS485Settings(...) ser.write(b'hello') There is a subclass :class:`rs485.RS485` available to emulate the RS485 support -on regular serial ports. +on regular serial ports (``serial.rs485`` needs to be imported). .. class:: rs485.RS485Settings @@ -1202,48 +1253,13 @@ Example:: asyncio ======= -.. module:: serial.aio - -.. warning:: This implementation is currently in an experimental state. Use - at your own risk. - -Experimental asyncio support is available for Python 3.4 and newer. The module -:mod:`serial.aio` provides a :class:`asyncio.Transport`: -``SerialTransport``. +``asyncio`` was introduced with Python 3.4. Experimental support for pySerial +is provided via a separate distribution `pyserial-asyncio`_. +It is currently under developement, see: -A factory function (`asyncio.coroutine`) is provided: +- http://pyserial-asyncio.readthedocs.io/ +- https://github.com/pyserial/pyserial-asyncio -.. function:: create_serial_connection(loop, protocol_factory, \*args, \*\*kwargs) - - :param loop: The event handler - :param protocol_factory: Factory function for a :class:`asyncio.Protocol` - :param args: Passed to the :class:`serial.Serial` init function - :param kwargs: Passed to the :class:`serial.Serial` init function - :platform: Posix - - Get a connection making coroutine. - -Example:: - - class Output(asyncio.Protocol): - def connection_made(self, transport): - self.transport = transport - print('port opened', transport) - transport.serial.rts = False - transport.write(b'hello world\n') - - def data_received(self, data): - print('data received', repr(data)) - self.transport.close() - - def connection_lost(self, exc): - print('port closed') - asyncio.get_event_loop().stop() - - loop = asyncio.get_event_loop() - coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200) - loop.run_until_complete(coro) - loop.run_forever() - loop.close() +.. _`pyserial-asyncio`: https://pypi.python.org/pypi/pyserial-asyncio diff --git a/documentation/url_handlers.rst b/documentation/url_handlers.rst index ae331f9..81e6ff8 100644 --- a/documentation/url_handlers.rst +++ b/documentation/url_handlers.rst @@ -122,6 +122,8 @@ Supported options in the URL are: not locked automatically (e.g. Posix). +.. _spy: + ``spy://`` ========== Wrapping the native serial port, this protocol makes it possible to diff --git a/examples/at_protocol.py b/examples/at_protocol.py index 36eb6bd..7d43007 100644 --- a/examples/at_protocol.py +++ b/examples/at_protocol.py @@ -91,7 +91,7 @@ class ATProtocol(serial.threaded.LineReader): else: lines.append(line) except queue.Empty: - raise ATException('AT command timeout (%r)' % (command,)) + raise ATException('AT command timeout ({!r})'.format(command)) # test @@ -123,16 +123,16 @@ if __name__ == '__main__': """Handle events and command responses starting with '+...'""" if event.startswith('+RRBDRES') and self._awaiting_response_for.startswith('AT+JRBD'): rev = event[9:9 + 12] - mac = ':'.join('%02X' % ord(x) for x in rev.decode('hex')[::-1]) + mac = ':'.join('{:02X}'.format(ord(x)) for x in rev.decode('hex')[::-1]) self.event_responses.put(mac) else: - logging.warning('unhandled event: %r' % event) + logging.warning('unhandled event: {!r}'.format(event)) def command_with_event_response(self, command): """Send a command that responds with '+...' line""" with self.lock: # ensure that just one thread is sending commands at once self._awaiting_response_for = command - self.transport.write(b'%s\r\n' % (command.encode(self.ENCODING, self.UNICODE_HANDLING),)) + self.transport.write(b'{}\r\n'.format(command.encode(self.ENCODING, self.UNICODE_HANDLING))) response = self.event_responses.get() self._awaiting_response_for = None return response @@ -143,7 +143,7 @@ if __name__ == '__main__': self.command("AT+JRES", response='ROK') # SW-Reset BT module def get_mac_address(self): - # requests hardware / calibrationinfo as event + # requests hardware / calibration info as event return self.command_with_event_response("AT+JRBD") ser = serial.serial_for_url('spy://COM1', baudrate=115200, timeout=1) diff --git a/examples/port_publisher.py b/examples/port_publisher.py index cf44945..ae07f77 100755 --- a/examples/port_publisher.py +++ b/examples/port_publisher.py @@ -86,7 +86,7 @@ class ZeroconfService: self.group = None def __str__(self): - return "%r @ %s:%s (%s)" % (self.name, self.host, self.port, self.stype) + return "{!r} @ {}:{} ({})".format(self.name, self.host, self.port, self.stype) class Forwarder(ZeroconfService): @@ -154,7 +154,7 @@ class Forwarder(ZeroconfService): self.handle_server_error() #~ raise if self.log is not None: - self.log.info("%s: Waiting for connection on %s..." % (self.device, self.network_port)) + self.log.info("{}: Waiting for connection on {}...".format(self.device, self.network_port)) # zeroconfig self.publish() @@ -165,7 +165,7 @@ class Forwarder(ZeroconfService): def close(self): """Close all resources and unpublish service""" if self.log is not None: - self.log.info("%s: closing..." % (self.device, )) + self.log.info("{}: closing...".format(self.device)) self.alive = False self.unpublish() if self.server_socket: @@ -291,7 +291,7 @@ class Forwarder(ZeroconfService): self.socket.setblocking(0) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if self.log is not None: - self.log.warning('%s: Connected by %s:%s' % (self.device, addr[0], addr[1])) + self.log.warning('{}: Connected by {}:{}'.format(self.device, addr[0], addr[1])) self.serial.rts = True self.serial.dtr = True if self.log is not None: @@ -302,7 +302,7 @@ class Forwarder(ZeroconfService): # reject connection if there is already one connection.close() if self.log is not None: - self.log.warning('%s: Rejecting connect from %s:%s' % (self.device, addr[0], addr[1])) + self.log.warning('{}: Rejecting connect from {}:{}'.format(self.device, addr[0], addr[1])) def handle_server_error(self): """Socket server fails""" @@ -326,7 +326,7 @@ class Forwarder(ZeroconfService): self.socket.close() self.socket = None if self.log is not None: - self.log.warning('%s: Disconnected' % self.device) + self.log.warning('{}: Disconnected'.format(self.device)) def test(): @@ -451,7 +451,7 @@ terminated, it waits for the next connect. # exit first parent sys.exit(0) except OSError as e: - log.critical("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) + log.critical("fork #1 failed: {} ({})\n".format(e.errno, e.strerror)) sys.exit(1) # decouple from parent environment @@ -465,10 +465,10 @@ terminated, it waits for the next connect. if pid > 0: # exit from second parent, save eventual PID before if args.pidfile is not None: - open(args.pidfile, 'w').write("%d" % pid) + open(args.pidfile, 'w').write("{}".formt(pid)) sys.exit(0) except OSError as e: - log.critical("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) + log.critical("fork #2 failed: {} ({})\n".format(e.errno, e.strerror)) sys.exit(1) if args.logfile is None: @@ -512,7 +512,7 @@ terminated, it waits for the next connect. except KeyError: pass else: - log.info("unpublish: %s" % (forwarder)) + log.info("unpublish: {}".format(forwarder)) alive = True next_check = 0 @@ -526,7 +526,7 @@ terminated, it waits for the next connect. connected = [d for d, p, i in serial.tools.list_ports.grep(args.ports_regex)] # Handle devices that are published, but no longer connected for device in set(published).difference(connected): - log.info("unpublish: %s" % (published[device])) + log.info("unpublish: {}".format(published[device])) unpublish(published[device]) # Handle devices that are connected but not yet published for device in sorted(set(connected).difference(published)): @@ -537,11 +537,11 @@ terminated, it waits for the next connect. port += 1 published[device] = Forwarder( device, - "%s on %s" % (device, hostname), + "{} on {}".format(device, hostname), port, on_close=unpublish, log=log) - log.warning("publish: %s" % (published[device])) + log.warning("publish: {}".format(published[device])) published[device].open() # select_start = time.time() diff --git a/examples/rfc2217_server.py b/examples/rfc2217_server.py index 7830e40..5955fc0 100755 --- a/examples/rfc2217_server.py +++ b/examples/rfc2217_server.py @@ -58,7 +58,7 @@ class Redirector(object): # escape outgoing data when needed (Telnet IAC (0xff) character) self.write(serial.to_bytes(self.rfc2217.escape(data))) except socket.error as msg: - self.log.error('%s' % (msg,)) + self.log.error('{}'.format(msg)) # probably got disconnected break self.alive = False @@ -78,7 +78,7 @@ class Redirector(object): break self.serial.write(serial.to_bytes(self.rfc2217.filter(data))) except socket.error as msg: - self.log.error('%s' % (msg,)) + self.log.error('{}'.format(msg)) # probably got disconnected break self.stop() diff --git a/examples/tcp_serial_redirect.py b/examples/tcp_serial_redirect.py index 97a73b4..8440296 100755 --- a/examples/tcp_serial_redirect.py +++ b/examples/tcp_serial_redirect.py @@ -2,7 +2,7 @@ # # Redirect data from a TCP/IP connection to a serial port and vice versa. # -# (C) 2002-2015 Chris Liechti <cliechti@gmx.net> +# (C) 2002-2016 Chris Liechti <cliechti@gmx.net> # # SPDX-License-Identifier: BSD-3-Clause @@ -10,6 +10,7 @@ import sys import socket import serial import serial.threaded +import time class SerialToNet(serial.threaded.Protocol): @@ -56,6 +57,12 @@ it waits for the next connect. help='suppress non error messages', default=False) + parser.add_argument( + '--develop', + action='store_true', + help='Development mode, prints Python internals on errors', + default=False) + group = parser.add_argument_group('serial port') group.add_argument( @@ -91,12 +98,20 @@ it waits for the next connect. group = parser.add_argument_group('network settings') - group.add_argument( + exclusive_group = group.add_mutually_exclusive_group() + + exclusive_group.add_argument( '-P', '--localport', type=int, help='local TCP port', default=7777) + exclusive_group.add_argument( + '-c', '--client', + metavar='HOST:PORT', + help='make the connection as a client, instead of running a server', + default=False) + args = parser.parse_args() # connect to serial port @@ -127,15 +142,40 @@ it waits for the next connect. serial_worker = serial.threaded.ReaderThread(ser, ser_to_net) serial_worker.start() - srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - srv.bind(('', args.localport)) - srv.listen(1) + if not args.client: + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + srv.bind(('', args.localport)) + srv.listen(1) try: + intentional_exit = False while True: - sys.stderr.write('Waiting for connection on {}...\n'.format(args.localport)) - client_socket, addr = srv.accept() - sys.stderr.write('Connected by {}\n'.format(addr)) + if args.client: + host, port = args.client.split(':') + sys.stderr.write("Opening connection to {}:{}...\n".format(host, port)) + client_socket = socket.socket() + try: + client_socket.connect((host, int(port))) + except socket.error as msg: + sys.stderr.write('WARNING: {}\n'.format(msg)) + time.sleep(5) # intentional delay on reconnection as client + continue + sys.stderr.write('Connected\n') + client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + #~ client_socket.settimeout(5) + else: + sys.stderr.write('Waiting for connection on {}...\n'.format(args.localport)) + client_socket, addr = srv.accept() + sys.stderr.write('Connected by {}\n'.format(addr)) + # More quickly detect bad clients who quit without closing the + # connection: After 1 second of idle, start sending TCP keep-alive + # packets every 1 second. If 3 consecutive keep-alive packets + # fail, assume the client is gone and close the connection. + client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1) + client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1) + client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3) + client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) try: ser_to_net.socket = client_socket # enter network <-> serial loop @@ -146,15 +186,24 @@ it waits for the next connect. break ser.write(data) # get a bunch of bytes and send them except socket.error as msg: - sys.stderr.write('ERROR: %s\n' % msg) + if args.develop: + raise + sys.stderr.write('ERROR: {}\n'.format(msg)) # probably got disconnected break + except KeyboardInterrupt: + intentional_exit = True + raise except socket.error as msg: + if args.develop: + raise sys.stderr.write('ERROR: {}\n'.format(msg)) finally: ser_to_net.socket = None sys.stderr.write('Disconnected\n') client_socket.close() + if args.client and not intentional_exit: + time.sleep(5) # intentional delay on reconnection as client except KeyboardInterrupt: pass diff --git a/examples/wxSerialConfigDialog.py b/examples/wxSerialConfigDialog.py index a29b67d..0064a9c 100755 --- a/examples/wxSerialConfigDialog.py +++ b/examples/wxSerialConfigDialog.py @@ -99,7 +99,7 @@ class SerialConfigDialog(wx.Dialog): self.choice_port.Clear() self.ports = [] for n, (portname, desc, hwid) in enumerate(sorted(serial.tools.list_ports.comports())): - self.choice_port.Append('%s - %s' % (portname, desc)) + self.choice_port.Append(u'{} - {}'.format(portname, desc)) self.ports.append(portname) if self.serial.name == portname: preferred_index = n @@ -115,7 +115,7 @@ class SerialConfigDialog(wx.Dialog): if preferred_index is not None: self.combo_box_baudrate.SetSelection(preferred_index) else: - self.combo_box_baudrate.SetValue(u'%d' % (self.serial.baudrate,)) + self.combo_box_baudrate.SetValue(u'{}'.format(self.serial.baudrate)) if self.show & SHOW_FORMAT: # fill in data bits and select current setting self.choice_databits.Clear() diff --git a/examples/wxTerminal.py b/examples/wxTerminal.py index 4ebabb7..0811721 100755 --- a/examples/wxTerminal.py +++ b/examples/wxTerminal.py @@ -274,15 +274,15 @@ class TerminalFrame(wx.Frame): dlg.ShowModal() else: self.StartThread() - self.SetTitle("Serial Terminal on %s [%s,%s,%s,%s%s%s]" % ( - self.serial.portstr, - self.serial.baudrate, - self.serial.bytesize, - self.serial.parity, - self.serial.stopbits, - ' RTS/CTS' if self.serial.rtscts else '', - ' Xon/Xoff' if self.serial.xonxoff else '', - )) + self.SetTitle("Serial Terminal on {} [{},{},{},{}{}{}]".format( + self.serial.portstr, + self.serial.baudrate, + self.serial.bytesize, + self.serial.parity, + self.serial.stopbits, + ' RTS/CTS' if self.serial.rtscts else '', + ' Xon/Xoff' if self.serial.xonxoff else '', + )) ok = True else: # on startup, dialog aborted diff --git a/serial/rfc2217.py b/serial/rfc2217.py index 5e3cbe3..dee5c2b 100644 --- a/serial/rfc2217.py +++ b/serial/rfc2217.py @@ -73,7 +73,8 @@ except ImportError: import queue as Queue import serial -from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, portNotOpenError +from serial.serialutil import SerialBase, SerialException, to_bytes, \ + iterbytes, portNotOpenError, Timeout # port string is expected to be something like this: # rfc2217://host:port @@ -350,8 +351,8 @@ class TelnetSubnegotiation(object): can also throw a value error when the answer from the server does not match the value sent. """ - timeout_time = time.time() + timeout - while time.time() < timeout_time: + timeout_timer = Timeout(timeout) + while not timeout_timer.expired(): time.sleep(0.05) # prevent 100% CPU load if self.is_ready(): break @@ -384,7 +385,7 @@ class Serial(SerialBase): self._socket = None self._linestate = 0 self._modemstate = None - self._modemstate_expires = 0 + self._modemstate_timeout = Timeout(-1) self._remote_suspend_flow = False self._write_lock = None self.logger = None @@ -453,7 +454,7 @@ class Serial(SerialBase): # cache for line and modem states that the server sends to us self._linestate = 0 self._modemstate = None - self._modemstate_expires = 0 + self._modemstate_timeout = Timeout(-1) # RFC 2217 flow control between server and client self._remote_suspend_flow = False @@ -469,8 +470,8 @@ class Serial(SerialBase): if option.state is REQUESTED: self.telnet_send_option(option.send_yes, option.option) # now wait until important options are negotiated - timeout_time = time.time() + self._network_timeout - while time.time() < timeout_time: + timeout = Timeout(self._network_timeout) + while not timeout.expired(): time.sleep(0.05) # prevent 100% CPU load if sum(o.active for o in mandadory_options) == sum(o.state != INACTIVE for o in mandadory_options): break @@ -518,8 +519,8 @@ class Serial(SerialBase): items = self._rfc2217_port_settings.values() if self.logger: self.logger.debug("Negotiating settings: {}".format(items)) - timeout_time = time.time() + self._network_timeout - while time.time() < timeout_time: + timeout = Timeout(self._network_timeout) + while not timeout.expired(): time.sleep(0.05) # prevent 100% CPU load if sum(o.active for o in items) == len(items): break @@ -822,7 +823,7 @@ class Serial(SerialBase): if self.logger: self.logger.info("NOTIFY_MODEMSTATE: {}".format(self._modemstate)) # update time when we think that a poll would make sense - self._modemstate_expires = time.time() + 0.3 + self._modemstate_timeout.restart(0.3) elif suboption[1:2] == FLOWCONTROL_SUSPEND: self._remote_suspend_flow = True elif suboption[1:2] == FLOWCONTROL_RESUME: @@ -893,17 +894,17 @@ class Serial(SerialBase): etc.) """ # active modem state polling enabled? is the value fresh enough? - if self._poll_modem_state and self._modemstate_expires < time.time(): + if self._poll_modem_state and self._modemstate_timeout.expired(): if self.logger: self.logger.debug('polling modem state') # when it is older, request an update self.rfc2217_send_subnegotiation(NOTIFY_MODEMSTATE) - timeout_time = time.time() + self._network_timeout - while time.time() < timeout_time: + timeout = Timeout(self._network_timeout) + while not timeout.expired(): time.sleep(0.05) # prevent 100% CPU load # when expiration time is updated, it means that there is a new # value - if self._modemstate_expires > time.time(): + if not self._modemstate_timeout.expired(): break else: if self.logger: diff --git a/serial/serialposix.py b/serial/serialposix.py index 913b643..01848e9 100644 --- a/serial/serialposix.py +++ b/serial/serialposix.py @@ -34,10 +34,10 @@ import select import struct import sys import termios -import time import serial -from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError +from serial.serialutil import SerialBase, SerialException, to_bytes, \ + portNotOpenError, writeTimeoutError, Timeout class PlatformSpecificBase(object): @@ -49,6 +49,11 @@ class PlatformSpecificBase(object): def _set_rs485_mode(self, rs485_settings): raise NotImplementedError('RS485 not supported on this platform') + +# some systems support an extra flag to enable the two in POSIX unsupported +# paritiy settings for MARK and SPACE +CMSPAR = 0 # default, for unsupported platforms, override below + # try to detect the OS so that a device can be selected... # this code block should supply a device() and set_special_baudrate() function # for the platform @@ -57,6 +62,9 @@ plat = sys.platform.lower() if plat[:5] == 'linux': # Linux (confirmed) # noqa import array + # extra termios flags + CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity + # baudrate ioctls TCGETS2 = 0x802C542A TCSETS2 = 0x402C542B @@ -181,6 +189,21 @@ elif plat[:6] == 'darwin': # OS X buf = array.array('i', [baudrate]) fcntl.ioctl(self.fd, IOSSIOSPEED, buf, 1) +elif plat[:3] == 'bsd' or \ + plat[:7] == 'freebsd' or \ + plat[:6] == 'netbsd' or \ + plat[:7] == 'openbsd': + + class ReturnBaudrate(object): + def __getitem__(self, key): + return key + + class PlatformSpecific(PlatformSpecificBase): + # Only tested on FreeBSD: + # The baud rate may be passed in as + # a literal value. + BAUDRATE_CONSTANTS = ReturnBaudrate() + else: class PlatformSpecific(PlatformSpecificBase): pass @@ -220,8 +243,6 @@ TIOCM_DTR_str = struct.pack('I', TIOCM_DTR) TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427) TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428) -CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity - class Serial(SerialBase, PlatformSpecific): """\ @@ -266,7 +287,8 @@ class Serial(SerialBase, PlatformSpecific): if not self._rtscts: self._update_rts_state() except IOError as e: - if e.errno in (22, 25): # ignore Invalid argument and Inappropriate ioctl + if e.errno in (errno.EINVAL, errno.ENOTTY): + # ignore Invalid argument and Inappropriate ioctl pass else: raise @@ -349,15 +371,16 @@ class Serial(SerialBase, PlatformSpecific): # setup parity iflag &= ~(termios.INPCK | termios.ISTRIP) if self._parity == serial.PARITY_NONE: - cflag &= ~(termios.PARENB | termios.PARODD) + cflag &= ~(termios.PARENB | termios.PARODD | CMSPAR) elif self._parity == serial.PARITY_EVEN: - cflag &= ~(termios.PARODD) + cflag &= ~(termios.PARODD | CMSPAR) cflag |= (termios.PARENB) elif self._parity == serial.PARITY_ODD: + cflag &= ~CMSPAR cflag |= (termios.PARENB | termios.PARODD) - elif self._parity == serial.PARITY_MARK and plat[:5] == 'linux': + elif self._parity == serial.PARITY_MARK and CMSPAR: cflag |= (termios.PARENB | CMSPAR | termios.PARODD) - elif self._parity == serial.PARITY_SPACE and plat[:5] == 'linux': + elif self._parity == serial.PARITY_SPACE and CMSPAR: cflag |= (termios.PARENB | CMSPAR) cflag &= ~(termios.PARODD) else: @@ -443,11 +466,10 @@ class Serial(SerialBase, PlatformSpecific): if not self.is_open: raise portNotOpenError read = bytearray() - timeout = self._timeout + timeout = Timeout(self._timeout) while len(read) < size: try: - start_time = time.time() - ready, _, _ = select.select([self.fd, self.pipe_abort_read_r], [], [], timeout) + ready, _, _ = select.select([self.fd, self.pipe_abort_read_r], [], [], timeout.time_left()) if self.pipe_abort_read_r in ready: os.read(self.pipe_abort_read_r, 1000) break @@ -479,10 +501,8 @@ class Serial(SerialBase, PlatformSpecific): # see also http://www.python.org/dev/peps/pep-3151/#select if e[0] != errno.EAGAIN: raise SerialException('read failed: {}'.format(e)) - if timeout is not None: - timeout -= time.time() - start_time - if timeout <= 0: - break + if timeout.expired(): + break return bytes(read) def cancel_read(self): @@ -497,30 +517,27 @@ class Serial(SerialBase, PlatformSpecific): raise portNotOpenError d = to_bytes(data) tx_len = len(d) - timeout = self._write_timeout - if timeout and timeout > 0: # Avoid comparing None with zero - timeout += time.time() + timeout = Timeout(self._write_timeout) while tx_len > 0: try: n = os.write(self.fd, d) - if timeout == 0: + if timeout.is_non_blocking: # Zero timeout indicates non-blocking - simply return the # number of bytes of data actually written return n - elif timeout and timeout > 0: # Avoid comparing None with zero + elif not timeout.is_infinite: # when timeout is set, use select to wait for being ready # with the time left as timeout - timeleft = timeout - time.time() - if timeleft < 0: + if timeout.expired(): raise writeTimeoutError - abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], timeleft) + abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], timeout.time_left()) if abort: os.read(self.pipe_abort_write_r, 1000) break if not ready: raise writeTimeoutError else: - assert timeout is None + assert timeout.time_left() is None # wait for write operation abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], None) if abort: @@ -536,7 +553,7 @@ class Serial(SerialBase, PlatformSpecific): if v.errno != errno.EAGAIN: raise SerialException('write failed: {}'.format(v)) # still calculate and check timeout - if timeout and timeout - time.time() < 0: + if timeout.expired(): raise writeTimeoutError return len(data) @@ -730,6 +747,9 @@ class VTIMESerial(Serial): if self._inter_byte_timeout is not None: vmin = 1 vtime = int(self._inter_byte_timeout * 10) + elif self._timeout is None: + vmin = 1 + vtime = 0 else: vmin = 0 vtime = int(self._timeout * 10) @@ -764,3 +784,6 @@ class VTIMESerial(Serial): break read.extend(buf) return bytes(read) + + # hack to make hasattr return false + cancel_read = property() diff --git a/serial/serialutil.py b/serial/serialutil.py index 474b4c2..636a10c 100644 --- a/serial/serialutil.py +++ b/serial/serialutil.py @@ -3,7 +3,7 @@ # Base class and support functions used by various backends. # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C) 2001-2015 Chris Liechti <cliechti@gmx.net> +# (C) 2001-2016 Chris Liechti <cliechti@gmx.net> # # SPDX-License-Identifier: BSD-3-Clause @@ -62,14 +62,9 @@ def to_bytes(seq): elif isinstance(seq, unicode): raise TypeError('unicode strings are not supported, please encode to bytes: {!r}'.format(seq)) else: - b = bytearray() - for item in seq: - # this one handles int and bytes in Python 2.7 - # add conversion in case of Python 3.x - if isinstance(item, bytes): - item = ord(item) - b.append(item) - return bytes(b) + # handle list of integers and bytes (one or more items) for Python 2 and 3 + return bytes(bytearray(seq)) + # create control bytes XON = to_bytes([17]) @@ -104,6 +99,65 @@ writeTimeoutError = SerialTimeoutException('Write timeout') portNotOpenError = SerialException('Attempting to use a port that is not open') +class Timeout(object): + """\ + Abstraction for timeout operations. Using time.monotonic() if available + or time.time() in all other cases. + + The class can also be initialized with 0 or None, in order to support + non-blocking and fully blocking I/O operations. The attributes + is_non_blocking and is_infinite are set accordingly. + """ + if hasattr(time, 'monotonic'): + # Timeout implementation with time.monotonic(). This function is only + # supported by Python 3.3 and above. It returns a time in seconds + # (float) just as time.time(), but is not affected by system clock + # adjustments. + TIME = time.monotonic + else: + # Timeout implementation with time.time(). This is compatible with all + # Python versions but has issues if the clock is adjusted while the + # timeout is running. + TIME = time.time + + def __init__(self, duration): + """Initialize a timeout with given duration""" + self.is_infinite = (duration is None) + self.is_non_blocking = (duration == 0) + self.duration = duration + if duration is not None: + self.target_time = self.TIME() + duration + else: + self.target_time = None + + def expired(self): + """Return a boolean, telling if the timeout has expired""" + return self.target_time is not None and self.time_left() <= 0 + + def time_left(self): + """Return how many seconds are left until the timeout expires""" + if self.is_non_blocking: + return 0 + elif self.is_infinite: + return None + else: + delta = self.target_time - self.TIME() + if delta > self.duration: + # clock jumped, recalculate + self.target_time = self.TIME() + self.duration + return self.duration + else: + return max(0, delta) + + def restart(self, duration): + """\ + Restart a timeout, only supported if a timeout was already set up + before. + """ + self.duration = duration + self.target_time = self.TIME() + duration + + class SerialBase(io.RawIOBase): """\ Serial port base class. Provides __init__ function and properties to diff --git a/serial/serialwin32.py b/serial/serialwin32.py index 484c4a1..b275ea3 100644 --- a/serial/serialwin32.py +++ b/serial/serialwin32.py @@ -309,18 +309,29 @@ class Serial(SerialBase): if data: #~ win32event.ResetEvent(self._overlapped_write.hEvent) n = win32.DWORD() - err = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write) - if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: - raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError())) + success = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write) if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0) + if not success and win32.GetLastError() != win32.ERROR_IO_PENDING: + raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError())) + # Wait for the write to complete. #~ win32.WaitForSingleObject(self._overlapped_write.hEvent, win32.INFINITE) - err = win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True) + win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True) if win32.GetLastError() == win32.ERROR_OPERATION_ABORTED: return n.value # canceled IO is no error if n.value != len(data): raise writeTimeoutError - return n.value + return n.value + else: + errorcode = win32.ERROR_SUCCESS if success else win32.GetLastError() + if errorcode in (win32.ERROR_INVALID_USER_BUFFER, win32.ERROR_NOT_ENOUGH_MEMORY, + win32.ERROR_OPERATION_ABORTED): + return 0 + elif errorcode in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING): + # no info on true length provided by OS function in async mode + return len(data) + else: + raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError())) else: return 0 @@ -332,7 +343,7 @@ class Serial(SerialBase): while self.out_waiting: time.sleep(0.05) # XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would - # require overlapped IO and its also only possible to set a single mask + # require overlapped IO and it's also only possible to set a single mask # on the port--- def reset_input_buffer(self): @@ -405,7 +416,7 @@ class Serial(SerialBase): def set_buffer_size(self, rx_size=4096, tx_size=None): """\ Recommend a buffer size to the driver (device driver can ignore this - value). Must be called before the port is opended. + value). Must be called before the port is opened. """ if tx_size is None: tx_size = rx_size diff --git a/serial/threaded/__init__.py b/serial/threaded/__init__.py index 325d4a3..74b6924 100644 --- a/serial/threaded/__init__.py +++ b/serial/threaded/__init__.py @@ -3,7 +3,7 @@ # Working with threading and pySerial # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C) 2015 Chris Liechti <cliechti@gmx.net> +# (C) 2015-2016 Chris Liechti <cliechti@gmx.net> # # SPDX-License-Identifier: BSD-3-Clause """\ @@ -102,10 +102,10 @@ class FramedPacket(Protocol): self.in_packet = True elif byte == self.STOP: self.in_packet = False - self.handle_packet(self.packet) + self.handle_packet(bytes(self.packet)) # make read-only copy del self.packet[:] elif self.in_packet: - self.packet.append(byte) + self.packet.extend(byte) else: self.handle_out_of_packet_data(byte) @@ -273,7 +273,7 @@ if __name__ == '__main__': self.write_line('hello world') def handle_line(self, data): - sys.stdout.write('line received: {}\n'.format(repr(data))) + sys.stdout.write('line received: {!r}\n'.format(data)) def connection_lost(self, exc): if exc: diff --git a/serial/tools/list_ports_common.py b/serial/tools/list_ports_common.py index e5935c9..df12939 100644 --- a/serial/tools/list_ports_common.py +++ b/serial/tools/list_ports_common.py @@ -55,8 +55,8 @@ class ListPortInfo(object): def usb_info(self): """return a string with USB related information about device""" return 'USB VID:PID={:04X}:{:04X}{}{}'.format( - self.vid, - self.pid, + self.vid or 0, + self.pid or 0, ' SER={}'.format(self.serial_number) if self.serial_number is not None else '', ' LOCATION={}'.format(self.location) if self.location is not None else '') diff --git a/serial/tools/list_ports_windows.py b/serial/tools/list_ports_windows.py index a2922f8..a070559 100644 --- a/serial/tools/list_ports_windows.py +++ b/serial/tools/list_ports_windows.py @@ -17,7 +17,6 @@ from ctypes.wintypes import DWORD from ctypes.wintypes import WORD from ctypes.wintypes import LONG from ctypes.wintypes import ULONG -from ctypes.wintypes import LPCSTR from ctypes.wintypes import HKEY from ctypes.wintypes import BYTE import serial @@ -30,11 +29,12 @@ def ValidHandle(value, func, arguments): raise ctypes.WinError() return value + NULL = 0 HDEVINFO = ctypes.c_void_p -PCTSTR = ctypes.c_char_p -PTSTR = ctypes.c_void_p -CHAR = ctypes.c_char +LPCTSTR = ctypes.c_wchar_p +PCTSTR = ctypes.c_wchar_p +PTSTR = ctypes.c_wchar_p LPDWORD = PDWORD = ctypes.POINTER(DWORD) #~ LPBYTE = PBYTE = ctypes.POINTER(BYTE) LPBYTE = PBYTE = ctypes.c_void_p # XXX avoids error about types @@ -43,20 +43,6 @@ ACCESS_MASK = DWORD REGSAM = ACCESS_MASK -def byte_buffer(length): - """Get a buffer for a string""" - return (BYTE * length)() - - -def string(buffer): - s = [] - for c in buffer: - if c == 0: - break - s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned - return ''.join(s) - - class GUID(ctypes.Structure): _fields_ = [ ('Data1', DWORD), @@ -86,6 +72,7 @@ class SP_DEVINFO_DATA(ctypes.Structure): def __str__(self): return "ClassGuid:{} DevInst:{}".format(self.ClassGuid, self.DevInst) + PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA) PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p @@ -95,7 +82,7 @@ SetupDiDestroyDeviceInfoList = setupapi.SetupDiDestroyDeviceInfoList SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO] SetupDiDestroyDeviceInfoList.restype = BOOL -SetupDiClassGuidsFromName = setupapi.SetupDiClassGuidsFromNameA +SetupDiClassGuidsFromName = setupapi.SetupDiClassGuidsFromNameW SetupDiClassGuidsFromName.argtypes = [PCTSTR, ctypes.POINTER(GUID), DWORD, PDWORD] SetupDiClassGuidsFromName.restype = BOOL @@ -103,16 +90,16 @@ SetupDiEnumDeviceInfo = setupapi.SetupDiEnumDeviceInfo SetupDiEnumDeviceInfo.argtypes = [HDEVINFO, DWORD, PSP_DEVINFO_DATA] SetupDiEnumDeviceInfo.restype = BOOL -SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsA +SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsW SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD] SetupDiGetClassDevs.restype = HDEVINFO SetupDiGetClassDevs.errcheck = ValidHandle -SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyA +SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyW SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD] SetupDiGetDeviceRegistryProperty.restype = BOOL -SetupDiGetDeviceInstanceId = setupapi.SetupDiGetDeviceInstanceIdA +SetupDiGetDeviceInstanceId = setupapi.SetupDiGetDeviceInstanceIdW SetupDiGetDeviceInstanceId.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, PTSTR, DWORD, PDWORD] SetupDiGetDeviceInstanceId.restype = BOOL @@ -125,8 +112,8 @@ RegCloseKey = advapi32.RegCloseKey RegCloseKey.argtypes = [HKEY] RegCloseKey.restype = LONG -RegQueryValueEx = advapi32.RegQueryValueExA -RegQueryValueEx.argtypes = [HKEY, LPCSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD] +RegQueryValueEx = advapi32.RegQueryValueExW +RegQueryValueEx.argtypes = [HKEY, LPCTSTR , LPDWORD, LPDWORD, LPBYTE, LPDWORD] RegQueryValueEx.restype = LONG @@ -141,17 +128,13 @@ DICS_FLAG_GLOBAL = 1 DIREG_DEV = 0x00000001 KEY_READ = 0x20019 -# workaround for compatibility between Python 2.x and 3.x -Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports" -PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName" - def iterate_comports(): """Return a generator that yields descriptions for serial ports""" GUIDs = (GUID * 8)() # so far only seen one used, so hope 8 are enough... guids_size = DWORD() if not SetupDiClassGuidsFromName( - Ports, + "Ports", GUIDs, ctypes.sizeof(GUIDs), ctypes.byref(guids_size)): @@ -179,11 +162,11 @@ def iterate_comports(): 0, DIREG_DEV, # DIREG_DRV for SW info KEY_READ) - port_name_buffer = byte_buffer(250) + port_name_buffer = ctypes.create_unicode_buffer(250) port_name_length = ULONG(ctypes.sizeof(port_name_buffer)) RegQueryValueEx( hkey, - PortName, + "PortName", None, None, ctypes.byref(port_name_buffer), @@ -193,16 +176,17 @@ def iterate_comports(): # unfortunately does this method also include parallel ports. # we could check for names starting with COM or just exclude LPT # and hope that other "unknown" names are serial ports... - if string(port_name_buffer).startswith('LPT'): + if port_name_buffer.value.startswith('LPT'): continue # hardware ID - szHardwareID = byte_buffer(250) + szHardwareID = ctypes.create_unicode_buffer(250) # try to get ID that includes serial number if not SetupDiGetDeviceInstanceId( g_hdi, ctypes.byref(devinfo), - ctypes.byref(szHardwareID), + #~ ctypes.byref(szHardwareID), + szHardwareID, ctypes.sizeof(szHardwareID) - 1, None): # fall back to more generic hardware ID if that would fail @@ -218,21 +202,22 @@ def iterate_comports(): if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: raise ctypes.WinError() # stringify - szHardwareID_str = string(szHardwareID) + szHardwareID_str = szHardwareID.value - info = list_ports_common.ListPortInfo(string(port_name_buffer)) + info = list_ports_common.ListPortInfo(port_name_buffer.value) # in case of USB, make a more readable string, similar to that form # that we also generate on other platforms if szHardwareID_str.startswith('USB'): - m = re.search(r'VID_([0-9a-f]{4})&PID_([0-9a-f]{4})(\\(\w+))?', szHardwareID_str, re.I) + m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(\\(\w+))?', szHardwareID_str, re.I) if m: info.vid = int(m.group(1), 16) - info.pid = int(m.group(2), 16) - if m.group(4): - info.serial_number = m.group(4) + if m.group(3): + info.pid = int(m.group(3), 16) + if m.group(5): + info.serial_number = m.group(5) # calculate a location string - loc_path_str = byte_buffer(250) + loc_path_str = ctypes.create_unicode_buffer(250) if SetupDiGetDeviceRegistryProperty( g_hdi, ctypes.byref(devinfo), @@ -241,8 +226,7 @@ def iterate_comports(): ctypes.byref(loc_path_str), ctypes.sizeof(loc_path_str) - 1, None): - #~ print (string(loc_path_str)) - m = re.finditer(r'USBROOT\((\w+)\)|#USB\((\w+)\)', string(loc_path_str)) + m = re.finditer(r'USBROOT\((\w+)\)|#USB\((\w+)\)', loc_path_str.value) location = [] for g in m: if g.group(1): @@ -269,7 +253,7 @@ def iterate_comports(): info.hwid = szHardwareID_str # friendly name - szFriendlyName = byte_buffer(250) + szFriendlyName = ctypes.create_unicode_buffer(250) if SetupDiGetDeviceRegistryProperty( g_hdi, ctypes.byref(devinfo), @@ -279,7 +263,7 @@ def iterate_comports(): ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1, None): - info.description = string(szFriendlyName) + info.description = szFriendlyName.value #~ else: # Ignore ERROR_INSUFFICIENT_BUFFER #~ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py index 145996f..7c68e9d 100644 --- a/serial/tools/miniterm.py +++ b/serial/tools/miniterm.py @@ -169,7 +169,7 @@ elif os.name == 'posix': return c def cancel(self): - os.write(self.pipe_w, "x") + os.write(self.pipe_w, b"x") def cleanup(self): termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) @@ -553,7 +553,7 @@ class Miniterm(object): if new_filters: for f in new_filters: if f not in TRANSFORMATIONS: - sys.stderr.write('--- unknown filter: {}'.format(repr(f))) + sys.stderr.write('--- unknown filter: {}\n'.format(repr(f))) break else: self.filters = new_filters diff --git a/serial/urlhandler/protocol_loop.py b/serial/urlhandler/protocol_loop.py index 819da77..7bf6cf9 100644 --- a/serial/urlhandler/protocol_loop.py +++ b/serial/urlhandler/protocol_loop.py @@ -43,10 +43,11 @@ class Serial(SerialBase): 9600, 19200, 38400, 57600, 115200) def __init__(self, *args, **kwargs): - super(Serial, self).__init__(*args, **kwargs) self.buffer_size = 4096 self.queue = None self.logger = None + self._cancel_write = False + super(Serial, self).__init__(*args, **kwargs) def open(self): """\ @@ -151,7 +152,7 @@ class Serial(SerialBase): if self._timeout == 0: break else: - if data is not None: + if b is not None: data += b size -= 1 else: @@ -164,12 +165,19 @@ class Serial(SerialBase): break return bytes(data) + def cancel_read(self): + self.queue.put_nowait(None) + + def cancel_write(self): + self._cancel_write = True + def write(self, data): """\ Output the given byte string over the serial port. Can block if the connection is blocked. May raise SerialException if the connection is closed. """ + self._cancel_write = False if not self.is_open: raise portNotOpenError data = to_bytes(data) @@ -178,7 +186,13 @@ class Serial(SerialBase): # when a write timeout is configured check if we would be successful # (not sending anything, not even the part that would have time) if self._write_timeout is not None and time_used_to_send > self._write_timeout: - time.sleep(self._write_timeout) # must wait so that unit test succeeds + # must wait so that unit test succeeds + time_left = self._write_timeout + while time_left > 0 and not self._cancel_write: + time.sleep(min(time_left, 0.5)) + time_left -= 0.5 + if self._cancel_write: + return 0 # XXX raise writeTimeoutError for byte in iterbytes(data): self.queue.put(byte, timeout=self._write_timeout) diff --git a/serial/win32.py b/serial/win32.py index 31f21ee..905ce0f 100644 --- a/serial/win32.py +++ b/serial/win32.py @@ -218,10 +218,14 @@ FILE_FLAG_OVERLAPPED = 1073741824 # Variable c_int EV_DSR = 16 # Variable c_int MAXDWORD = 4294967295 # Variable c_uint EV_RLSD = 32 # Variable c_int + ERROR_SUCCESS = 0 +ERROR_NOT_ENOUGH_MEMORY = 8 ERROR_OPERATION_ABORTED = 995 ERROR_IO_INCOMPLETE = 996 ERROR_IO_PENDING = 997 # Variable c_long +ERROR_INVALID_USER_BUFFER = 1784 + MS_CTS_ON = 16 # Variable c_ulong EV_EVENT1 = 2048 # Variable c_int EV_RX80FULL = 1024 # Variable c_int @@ -9,14 +9,46 @@ # (C) 2001-2016 Chris Liechti <cliechti@gmx.net> # # SPDX-License-Identifier: BSD-3-Clause +import io +import os +import re try: from setuptools import setup except ImportError: from distutils.core import setup -import serial -version = serial.VERSION + +def read(*names, **kwargs): + """Python 2 and Python 3 compatible text file reading. + + Required for single-sourcing the version string. + """ + with io.open( + os.path.join(os.path.dirname(__file__), *names), + encoding=kwargs.get("encoding", "utf8") + ) as fp: + return fp.read() + + +def find_version(*file_paths): + """ + Search the file for a version string. + + file_path contain string path components. + + Reads the supplied Python module as text without importing it. + """ + version_file = read(*file_paths) + version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", + version_file, re.M) + if version_match: + return version_match.group(1) + raise RuntimeError("Unable to find version string.") + + +version = find_version('serial', '__init__.py') + setup( name="pyserial", diff --git a/test/handlers/protocol_test.py b/test/handlers/protocol_test.py index 42ac4b2..f2e572f 100644 --- a/test/handlers/protocol_test.py +++ b/test/handlers/protocol_test.py @@ -71,9 +71,9 @@ class DummySerial(SerialBase): self.logger.setLevel(LOGGER_LEVELS[value]) self.logger.debug('enabled logging') else: - raise ValueError('unknown option: %r' % (option,)) + raise ValueError('unknown option: {!r}'.format(option)) except ValueError as e: - raise SerialException('expected a string in the form "[test://][option[/option...]]": %s' % e) + raise SerialException('expected a string in the form "[test://][option[/option...]]": {}'.format(e)) return (host, port) # - - - - - - - - - - - - - - - - - - - - - - - - @@ -120,26 +120,26 @@ class DummySerial(SerialBase): duration.""" if not self._isOpen: raise portNotOpenError if self.logger: - self.logger.info('ignored sendBreak(%r)' % (duration,)) + self.logger.info('ignored sendBreak({!r})'.format(duration)) def setBreak(self, level=True): """Set break: Controls TXD. When active, to transmitting is possible.""" if not self._isOpen: raise portNotOpenError if self.logger: - self.logger.info('ignored setBreak(%r)' % (level,)) + self.logger.info('ignored setBreak({!r})'.format(level)) def setRTS(self, level=True): """Set terminal status line: Request To Send""" if not self._isOpen: raise portNotOpenError if self.logger: - self.logger.info('ignored setRTS(%r)' % (level,)) + self.logger.info('ignored setRTS({!r})'.format(level)) def setDTR(self, level=True): """Set terminal status line: Data Terminal Ready""" if not self._isOpen: raise portNotOpenError if self.logger: - self.logger.info('ignored setDTR(%r)' % (level,)) + self.logger.info('ignored setDTR({!r})'.format(level)) def getCTS(self): """Read terminal status line: Clear To Send""" @@ -192,11 +192,11 @@ else: if __name__ == '__main__': import sys s = Serial('test://logging=debug') - sys.stdout.write('%s\n' % s) + sys.stdout.write('{}\n'.format(s)) sys.stdout.write("write...\n") s.write("hello\n") s.flush() - sys.stdout.write("read: %s\n" % s.read(5)) + sys.stdout.write("read: {}\n".format(s.read(5))) s.close() diff --git a/test/run_all_tests.py b/test/run_all_tests.py index 355cd44..e0797e7 100644 --- a/test/run_all_tests.py +++ b/test/run_all_tests.py @@ -15,10 +15,10 @@ import sys import os # inject local copy to avoid testing the installed version instead of the one in the repo -sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import serial # noqa -print("Patching sys.path to test local version. Testing Version: %s" % (serial.VERSION,)) +print("Patching sys.path to test local version. Testing Version: {}".format(serial.VERSION)) PORT = 'loop://' if len(sys.argv) > 1: @@ -34,11 +34,11 @@ for modulename in [ try: module = __import__(modulename) except ImportError: - print("skipping %s" % (modulename,)) + print("skipping {}".format(modulename)) else: module.PORT = PORT testsuite = unittest.findTestCases(module) - print("found %s tests in %r" % (testsuite.countTestCases(), modulename)) + print("found {} tests in {!r}".format(testsuite.countTestCases(), modulename)) mainsuite.addTest(testsuite) verbosity = 1 diff --git a/test/test.py b/test/test.py index 8b38c8c..db03907 100644 --- a/test/test.py +++ b/test/test.py @@ -66,7 +66,7 @@ class Test4_Nonblocking(unittest.TestCase): self.s.write(block) # there might be a small delay until the character is ready (especially on win32) time.sleep(0.05) - self.assertEqual(self.s.in_waiting, length, "expected exactly %d character for inWainting()" % length) + self.assertEqual(self.s.in_waiting, length, "expected exactly {} character for inWainting()".format(length)) self.assertEqual(self.s.read(length), block) #, "expected a %r which was written before" % block) self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read") @@ -131,7 +131,7 @@ class Test1_Forever(unittest.TestCase): a character is sent after some time to terminate the test (SendEvent).""" c = self.s.read(1) if not (self.event.isSet() and c == b'E'): - self.fail("expected marker (evt=%r, c=%r)" % (self.event.isSet(), c)) + self.fail("expected marker (evt={!r}, c={!r})".format(self.event.isSet(), c)) class Test2_Forever(unittest.TestCase): @@ -220,14 +220,14 @@ class Test_MoreTimeouts(unittest.TestCase): t1 = time.time() self.assertRaises(serial.SerialTimeoutException, self.s.write, b"timeout please" * 200) t2 = time.time() - self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval (%s)" % (t2 - t1)) + self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval ({})".format(t2 - t1)) if __name__ == '__main__': sys.stdout.write(__doc__) if len(sys.argv) > 1: PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) + sys.stdout.write("Testing port: {!r}\n".format(PORT)) sys.argv[1:] = ['-v'] # When this module is executed from the command-line, it runs all its tests unittest.main() diff --git a/test/test_advanced.py b/test/test_advanced.py index 19559b2..527cc47 100644 --- a/test/test_advanced.py +++ b/test/test_advanced.py @@ -23,7 +23,7 @@ import unittest import serial # on which port should the tests be performed: -PORT = 0 +PORT = 'loop://' class Test_ChangeAttributes(unittest.TestCase): @@ -38,41 +38,19 @@ class Test_ChangeAttributes(unittest.TestCase): def test_PortSetting(self): self.s.port = PORT - # portstr has to be set - if isinstance(PORT, str): - self.assertEqual(self.s.portstr.lower(), PORT.lower()) - else: - self.assertEqual(self.s.portstr, serial.device(PORT)) + self.assertEqual(self.s.portstr.lower(), PORT.lower()) # test internals self.assertEqual(self.s._port, PORT) # test on the fly change self.s.open() self.assertTrue(self.s.isOpen()) - #~ try: - #~ self.s.port = 0 - #~ except serial.SerialException: # port not available on system - #~ pass # can't test on this machine... - #~ else: - #~ self.failUnless(self.s.isOpen()) - #~ self.failUnlessEqual(self.s.port, 0) - #~ self.failUnlessEqual(self.s.portstr, serial.device(0)) - #~ try: - #~ self.s.port = 1 - #~ except serial.SerialException: # port not available on system - #~ pass # can't test on this machine... - #~ else: - #~ self.failUnless(self.s.isOpen()) - #~ self.failUnlessEqual(self.s.port, 1) - #~ self.failUnlessEqual(self.s.portstr, serial.device(1)) def test_DoubleOpen(self): - self.s.port = PORT self.s.open() # calling open for a second time is an error self.assertRaises(serial.SerialException, self.s.open) def test_BaudrateSetting(self): - self.s.port = PORT self.s.open() for baudrate in (300, 9600, 19200, 115200): self.s.baudrate = baudrate @@ -88,7 +66,6 @@ class Test_ChangeAttributes(unittest.TestCase): # therefore the test can not choose a value that fails on any system. def disabled_test_BaudrateSetting2(self): # test illegal values, depending on machine/port some of these may be valid... - self.s.port = PORT self.s.open() for illegal_value in (500000, 576000, 921600, 92160): self.assertRaises(ValueError, setattr, self.s, 'baudrate', illegal_value) @@ -164,7 +141,6 @@ class Test_ChangeAttributes(unittest.TestCase): self.assertRaises(serial.SerialException, self.s.open) def test_PortOpenClose(self): - self.s.port = PORT for i in range(3): # open the port and check flag self.assertTrue(not self.s.isOpen()) @@ -179,7 +155,7 @@ if __name__ == '__main__': sys.stdout.write(__doc__) if len(sys.argv) > 1: PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) + sys.stdout.write("Testing port: {!r}\n".format(PORT)) sys.argv[1:] = ['-v'] # When this module is executed from the command-line, it runs all its tests unittest.main() diff --git a/test/test_asyncio.py b/test/test_asyncio.py index 99548b3..5df8ef2 100644 --- a/test/test_asyncio.py +++ b/test/test_asyncio.py @@ -76,7 +76,7 @@ if __name__ == '__main__': sys.stdout.write(__doc__) if len(sys.argv) > 1: PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) + sys.stdout.write("Testing port: {!r}\n".format(PORT)) sys.argv[1:] = ['-v'] # When this module is executed from the command-line, it runs all its tests unittest.main() diff --git a/test/test_cancel.py b/test/test_cancel.py index 210891b..daab1ce 100644 --- a/test/test_cancel.py +++ b/test/test_cancel.py @@ -45,7 +45,7 @@ class TestCancelRead(unittest.TestCase): self.s.read(1000) t2 = time.time() self.assertEqual(self.cancel_called, 1) - self.assertTrue(0.5 < (t2 - t1) < 2, 'Function did not return in time: {}'.format(t2 - t1)) + self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1)) #~ self.assertTrue(not self.s.isOpen()) #~ self.assertRaises(serial.SerialException, self.s.open) @@ -89,7 +89,7 @@ class TestCancelWrite(unittest.TestCase): self.s.write(DATA) t2 = time.time() self.assertEqual(self.cancel_called, 1) - self.assertTrue(0.5 < (t2 - t1) < 2, 'Function did not return in time: {}'.format(t2 - t1)) + self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1)) #~ self.assertTrue(not self.s.isOpen()) #~ self.assertRaises(serial.SerialException, self.s.open) @@ -103,7 +103,7 @@ if __name__ == '__main__': sys.stdout.write(__doc__) if len(sys.argv) > 1: PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) + sys.stdout.write("Testing port: {!r}\n".format(PORT)) sys.argv[1:] = ['-v'] # When this module is executed from the command-line, it runs all its tests unittest.main() diff --git a/test/test_high_load.py b/test/test_high_load.py index 48ec9f3..b0bd773 100644 --- a/test/test_high_load.py +++ b/test/test_high_load.py @@ -25,7 +25,7 @@ import sys import serial # on which port should the tests be performed: -PORT = 0 +PORT = 'loop://' BAUDRATE = 115200 #~ BAUDRATE=9600 @@ -61,7 +61,7 @@ class TestHighLoad(unittest.TestCase): for i in range(self.N): self.s.write(q) read = self.s.read(len(q) * self.N) - self.assertEqual(read, q * self.N, "expected what was written before. got %d bytes, expected %d" % (len(read), self.N * len(q))) + self.assertEqual(read, q * self.N, "expected what was written before. got {} bytes, expected {}".format(len(read), self.N * len(q))) self.assertEqual(self.s.inWaiting(), 0) # "expected empty buffer after all sent chars are read") @@ -70,7 +70,7 @@ if __name__ == '__main__': sys.stdout.write(__doc__) if len(sys.argv) > 1: PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) + sys.stdout.write("Testing port: {!r}\n".format(PORT)) sys.argv[1:] = ['-v'] # When this module is executed from the command-line, it runs all its tests unittest.main() diff --git a/test/test_iolib.py b/test/test_iolib.py index 71c7db6..84e3fa2 100644 --- a/test/test_iolib.py +++ b/test/test_iolib.py @@ -29,17 +29,8 @@ import sys import unittest import serial -# trick to make that this test run under 2.6 and 3.x without modification. -# problem is, io library on 2.6 does NOT accept type 'str' and 3.x doesn't -# like u'nicode' strings with the prefix and it is not providing an unicode -# function ('str' is now what 'unicode' used to be) -if sys.version_info >= (3, 0): - def unicode(x): - return x - - # on which port should the tests be performed: -PORT = 0 +PORT = 'loop://' class Test_SerialAndIO(unittest.TestCase): @@ -64,7 +55,7 @@ if __name__ == '__main__': sys.stdout.write(__doc__) if len(sys.argv) > 1: PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) + sys.stdout.write("Testing port: {!r}\n".format(PORT)) sys.argv[1:] = ['-v'] # When this module is executed from the command-line, it runs all its tests unittest.main() diff --git a/test/test_readline.py b/test/test_readline.py index ac0c813..34b807b 100644 --- a/test/test_readline.py +++ b/test/test_readline.py @@ -28,7 +28,7 @@ import serial #~ print serial.VERSION # on which port should the tests be performed: -PORT = 0 +PORT = 'loop://' if sys.version_info >= (3, 0): def data(string): @@ -98,7 +98,7 @@ if __name__ == '__main__': sys.stdout.write(__doc__) if len(sys.argv) > 1: PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) + sys.stdout.write("Testing port: {!r}\n".format(PORT)) sys.argv[1:] = ['-v'] # When this module is executed from the command-line, it runs all its tests unittest.main() diff --git a/test/test_rs485.py b/test/test_rs485.py index 1d7ed09..e918f67 100644 --- a/test/test_rs485.py +++ b/test/test_rs485.py @@ -13,7 +13,7 @@ import serial import serial.rs485 # on which port should the tests be performed: -PORT = 0 +PORT = 'loop://' class Test_RS485_settings(unittest.TestCase): @@ -43,6 +43,8 @@ class Test_RS485_class(unittest.TestCase): """Test RS485 class""" def setUp(self): + if not isinstance(serial.serial_for_url(PORT), serial.Serial): + raise unittest.SkipTest("RS485 test only compatible with real serial port") self.s = serial.rs485.RS485(PORT, timeout=1) def tearDown(self): @@ -59,7 +61,7 @@ if __name__ == '__main__': sys.stdout.write(__doc__) if len(sys.argv) > 1: PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) + sys.stdout.write("Testing port: {!r}\n".format(PORT)) sys.argv[1:] = ['-v'] # When this module is executed from the command-line, it runs all its tests unittest.main() diff --git a/test/test_settings_dict.py b/test/test_settings_dict.py index 12fd4c3..86ee4b2 100644 --- a/test/test_settings_dict.py +++ b/test/test_settings_dict.py @@ -15,7 +15,7 @@ import unittest import serial # on which port should the tests be performed: -PORT = 0 +PORT = 'loop://' SETTINGS = ('baudrate', 'bytesize', 'parity', 'stopbits', 'xonxoff', @@ -74,7 +74,7 @@ if __name__ == '__main__': sys.stdout.write(__doc__) if len(sys.argv) > 1: PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) + sys.stdout.write("Testing port: {!r}\n".format(PORT)) sys.argv[1:] = ['-v'] # When this module is executed from the command-line, it runs all its tests unittest.main() diff --git a/test/test_threaded.py b/test/test_threaded.py new file mode 100644 index 0000000..40f45ad --- /dev/null +++ b/test/test_threaded.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# +# This file is part of pySerial - Cross platform serial port support for Python +# (C) 2016 Chris Liechti <cliechti@gmx.net> +# +# SPDX-License-Identifier: BSD-3-Clause +"""\ +Test serial.threaded related functionality. +""" + +import os +import unittest +import serial +import serial.threaded +import time + + +# on which port should the tests be performed: +PORT = 'loop://' + +class Test_threaded(unittest.TestCase): + """Test serial.threaded related functionality""" + + def test_line_reader(self): + """simple test of line reader class""" + + class TestLines(serial.threaded.LineReader): + def __init__(self): + super(TestLines, self).__init__() + self.received_lines = [] + + def handle_line(self, data): + self.received_lines.append(data) + + ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1) + with serial.threaded.ReaderThread(ser, TestLines) as protocol: + protocol.write_line('hello') + protocol.write_line('world') + time.sleep(1) + self.assertEqual(protocol.received_lines, ['hello', 'world']) + + def test_framed_packet(self): + """simple test of line reader class""" + + class TestFramedPacket(serial.threaded.FramedPacket): + def __init__(self): + super(TestFramedPacket, self).__init__() + self.received_packets = [] + + def handle_packet(self, packet): + self.received_packets.append(packet) + + def send_packet(self, packet): + self.transport.write(self.START) + self.transport.write(packet) + self.transport.write(self.STOP) + + ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1) + with serial.threaded.ReaderThread(ser, TestFramedPacket) as protocol: + protocol.send_packet(b'1') + protocol.send_packet(b'2') + protocol.send_packet(b'3') + time.sleep(1) + self.assertEqual(protocol.received_packets, [b'1', b'2', b'3']) + + +if __name__ == '__main__': + import sys + sys.stdout.write(__doc__) + if len(sys.argv) > 1: + PORT = sys.argv[1] + sys.stdout.write("Testing port: {!r}\n".format(PORT)) + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/test/test_timeout_class.py b/test/test_timeout_class.py new file mode 100644 index 0000000..37c38b1 --- /dev/null +++ b/test/test_timeout_class.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python +# +# This file is part of pySerial - Cross platform serial port support for Python +# (C) 2016 Chris Liechti <cliechti@gmx.net> +# +# SPDX-License-Identifier: BSD-3-Clause +""" +Test Timeout helper class. +""" +import sys +import unittest +import time +from serial import serialutil + + +class TestTimeoutClass(unittest.TestCase): + """Test the Timeout class""" + + def test_simple_timeout(self): + """Test simple timeout""" + t = serialutil.Timeout(2) + self.assertFalse(t.expired()) + self.assertTrue(t.time_left() > 0) + time.sleep(2.1) + self.assertTrue(t.expired()) + self.assertEqual(t.time_left(), 0) + + def test_non_blocking(self): + """Test nonblocking case (0)""" + t = serialutil.Timeout(0) + self.assertTrue(t.is_non_blocking) + self.assertFalse(t.is_infinite) + self.assertTrue(t.expired()) + + def test_blocking(self): + """Test no timeout (None)""" + t = serialutil.Timeout(None) + self.assertFalse(t.is_non_blocking) + self.assertTrue(t.is_infinite) + #~ self.assertFalse(t.expired()) + + def test_changing_clock(self): + """Test recovery from chaning clock""" + class T(serialutil.Timeout): + def TIME(self): + return test_time + test_time = 1000 + t = T(10) + self.assertEqual(t.target_time, 1010) + self.assertFalse(t.expired()) + self.assertTrue(t.time_left() > 0) + test_time = 100 # clock jumps way back + self.assertTrue(t.time_left() > 0) + self.assertTrue(t.time_left() <= 10) + self.assertEqual(t.target_time, 110) + test_time = 10000 # jump way forward + self.assertEqual(t.time_left(), 0) # if will expire immediately + + +if __name__ == '__main__': + sys.stdout.write(__doc__) + if len(sys.argv) > 1: + PORT = sys.argv[1] + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/test/test_util.py b/test/test_util.py new file mode 100644 index 0000000..5bf8e60 --- /dev/null +++ b/test/test_util.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +# +# This file is part of pySerial - Cross platform serial port support for Python +# (C) 2016 Chris Liechti <cliechti@gmx.net> +# +# SPDX-License-Identifier: BSD-3-Clause +"""\ +Tests for utility functions of serualutil. +""" + +import os +import unittest +import serial + + +class Test_util(unittest.TestCase): + """Test serial utility functions""" + + def test_to_bytes(self): + self.assertEqual(serial.to_bytes([1, 2, 3]), b'\x01\x02\x03') + self.assertEqual(serial.to_bytes(b'\x01\x02\x03'), b'\x01\x02\x03') + self.assertEqual(serial.to_bytes(bytearray([1,2,3])), b'\x01\x02\x03') + # unicode is not supported test. use decode() instead of u'' syntax to be + # compatible to Python 3.x < 3.4 + self.assertRaises(TypeError, serial.to_bytes, b'hello'.decode('utf-8')) + + def test_iterbytes(self): + self.assertEqual(list(serial.iterbytes(b'\x01\x02\x03')), [b'\x01', b'\x02', b'\x03']) + + +if __name__ == '__main__': + import sys + sys.stdout.write(__doc__) + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() |