summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml2
-rw-r--r--CHANGES.rst20
-rw-r--r--documentation/appendix.rst24
-rw-r--r--documentation/pyserial.rst78
-rw-r--r--documentation/pyserial_api.rst106
-rw-r--r--documentation/url_handlers.rst2
-rw-r--r--examples/at_protocol.py10
-rwxr-xr-xexamples/port_publisher.py26
-rwxr-xr-xexamples/rfc2217_server.py4
-rwxr-xr-xexamples/tcp_serial_redirect.py69
-rwxr-xr-xexamples/wxSerialConfigDialog.py4
-rwxr-xr-xexamples/wxTerminal.py18
-rw-r--r--serial/rfc2217.py29
-rw-r--r--serial/serialposix.py75
-rw-r--r--serial/serialutil.py72
-rw-r--r--serial/serialwin32.py25
-rw-r--r--serial/threaded/__init__.py8
-rw-r--r--serial/tools/list_ports_common.py4
-rw-r--r--serial/tools/list_ports_windows.py74
-rw-r--r--serial/tools/miniterm.py4
-rw-r--r--serial/urlhandler/protocol_loop.py20
-rw-r--r--serial/win32.py4
-rw-r--r--setup.py36
-rw-r--r--test/handlers/protocol_test.py16
-rw-r--r--test/run_all_tests.py8
-rw-r--r--test/test.py8
-rw-r--r--test/test_advanced.py30
-rw-r--r--test/test_asyncio.py2
-rw-r--r--test/test_cancel.py6
-rw-r--r--test/test_high_load.py6
-rw-r--r--test/test_iolib.py13
-rw-r--r--test/test_readline.py4
-rw-r--r--test/test_rs485.py6
-rw-r--r--test/test_settings_dict.py4
-rw-r--r--test/test_threaded.py75
-rw-r--r--test/test_timeout_class.py66
-rw-r--r--test/test_util.py36
37 files changed, 689 insertions, 305 deletions
diff --git a/.travis.yml b/.travis.yml
index 0b47c2d..80d630e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -13,4 +13,4 @@ python:
script:
- python setup.py install
- - python test/test.py loop://
+ - python test/run_all_tests.py loop://
diff --git a/CHANGES.rst b/CHANGES.rst
index 9166c18..e960e61 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -649,8 +649,28 @@ Bugfixes (win32):
Version 3.x.x 2016-xx-xx
--------------------------
+Improvements:
+
+- add client mode to exmaple tcp_serial_redirect.py
+- use of monotonic clock for timeouts, when available (Python 3.3 and up)
+- [#169] arbitrary baudrate support for BSD family
+- improve tests, improve ``loop://``
+
+Bugfixes:
+
+- [#137] Exception while cancel in miniterm (python3)
+- [#143] Class Serial in protocol_loop.py references variable before assigning
+ to it
+- [#149] Python 3 fix for threaded.FramedPacket
+
Bugfixes (posix):
- [#133] _update_dtr_state throws Inappropriate ioctl for virtual serial
port created by socat on OS X
+- [#157] Broken handling of CMSPAR in serialposix.py
+
+Bugfixes (win32):
+- [#144] Use Unicode API for list_ports
+- [#145] list_ports_windows: support devices with only VID
+- [#162] Write in non-blocking mode returns incorrect value on windows
diff --git a/documentation/appendix.rst b/documentation/appendix.rst
index 5d8bae0..8bc2c1a 100644
--- a/documentation/appendix.rst
+++ b/documentation/appendix.rst
@@ -5,10 +5,18 @@
How To
======
-Enable :rfc:`2217` in programs using pySerial.
- Patch the code where the :class:`serial.Serial` is instantiated. Replace
+Enable :rfc:`2217` (and other URL handlers) in programs using pySerial.
+ Patch the code where the :class:`serial.Serial` is instantiated.
+ E.g. replace::
+
+ s = serial.Serial(...)
+
it with::
+ s = serial.serial_for_url(...)
+
+ or for backwards compatibility to old pySerial installations::
+
try:
s = serial.serial_for_url(...)
except AttributeError:
@@ -33,6 +41,10 @@ Test your setup.
on the screen, then at least RX and TX work (they still could be swapped
though).
+ There is also a ``spy:://`` URL handler. It prints all calls (read/write,
+ control lines) to the serial port to a file or stderr. See :ref:`spy`
+ for details.
+
FAQ
===
@@ -71,6 +83,14 @@ User supplied URL handlers
search path in :data:`serial.protocol_handler_packages`. This is possible
starting from pySerial V2.6.
+``Permission denied`` errors
+ On POSIX based systems, the user usually needs to be in a special group to
+ have access to serial ports.
+
+ On Debian based systems, serial ports are usually in the group ``dialout``,
+ so running ``sudo adduser $USER dialout`` (and logging-out and -in) enables
+ the user to access the port.
+
Related software
================
diff --git a/documentation/pyserial.rst b/documentation/pyserial.rst
index 2a7fe42..602134d 100644
--- a/documentation/pyserial.rst
+++ b/documentation/pyserial.rst
@@ -46,54 +46,58 @@ Features
Requirements
============
-- Python 2.7 or newer, including Python 3.4 and newer
-- "Java Communications" (JavaComm) or compatible extension for Java/Jython
+- Python 2.7 or Python 3.4 and newer
+
+- If running on Windows: Something newer than WinXP
+
+- If running on Jython: "Java Communications" (JavaComm) or compatible
+ extension for Java
+
+For older installations (older Python versions or older operating systems), see
+`older versions`_ below.
Installation
============
-pyserial
---------
This installs a package that can be used from Python (``import serial``).
To install for all users on the system, administrator rights (root)
may be required.
From PyPI
-~~~~~~~~~
-pySerial can be installed from PyPI, either manually downloading the
-files and installing as described below or using::
+---------
+pySerial can be installed from PyPI::
- pip install pyserial
+ python -m pip install pyserial
-or::
+Using the `python`/`python3` executable of the desired version (2.7/3.x).
- easy_install -U pyserial
+Developers also may be interested to get the source archive, because it
+contains examples, tests and the this documentation.
-From source (tar.gz or checkout)
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-Download the archive from http://pypi.python.org/pypi/pyserial.
+From source (zip/tar.gz or checkout)
+------------------------------------
+Download the archive from http://pypi.python.org/pypi/pyserial or
+https://github.com/pyserial/pyserial/releases.
Unpack the archive, enter the ``pyserial-x.y`` directory and run::
python setup.py install
-For Python 3.x::
-
- python3 setup.py install
+Using the `python`/`python3` executable of the desired version (2.7/3.x).
Packages
-~~~~~~~~
-There are also packaged versions for some Linux distributions and Windows:
+--------
+There are also packaged versions for some Linux distributions:
-Debian/Ubuntu
- A package is available under the name "python-serial". Note that some
- distributions may package an older version of pySerial.
+- Debian/Ubuntu: "python-serial", "python3-serial"
+- Fedora / RHEL / CentOS / EPEL: "pyserial"
+- Arch Linux: "python-pyserial"
+- Gento: "dev-python/pyserial"
-Windows
- There is also a Windows installer for end users. It is located in the
- PyPi_. Developers also may be interested to get the source archive,
- because it contains examples, tests and the this documentation.
+Note that some distributions may package an older version of pySerial.
+These packages are created and maintained by developers working on
+these distributions.
.. _PyPi: http://pypi.python.org/pypi/pyserial
@@ -102,21 +106,25 @@ References
==========
* Python: http://www.python.org/
* Jython: http://www.jython.org/
-* Java@IBM: http://www-106.ibm.com/developerworks/java/jdk/ (JavaComm links are
- on the download page for the respective platform JDK)
-* Java@SUN: http://java.sun.com/products/
* IronPython: http://www.codeplex.com/IronPython
-* setuptools: http://peak.telecommunity.com/DevCenter/setuptools
Older Versions
==============
-Older versions are still available in the old download_ page. pySerial 1.21
-is compatible with Python 2.0 on Windows, Linux and several un*x like systems,
-MacOSX and Jython.
+Older versions are still available on the current download_ page or the `old
+download`_ page. The last version of pySerial's 2.x series was `2.7`_,
+compatible with Python 2.3 and newer and partially with early Python 3.x
+versions.
+
+pySerial `1.21`_ is compatible with Python 2.0 on Windows, Linux and several
+un*x like systems, MacOSX and Jython.
+
+On Windows, releases older than 2.5 will depend on pywin32_ (previously known as
+win32all). WinXP is supported up to 3.0.1.
-On Windows releases older than 2.5 will depend on pywin32_ (previously known as
-win32all)
-.. _download: https://pypi.python.org/pypi/pyserial
+.. _`old download`: https://sourceforge.net/projects/pyserial/files/pyserial/
+.. _download: https://pypi.python.org/simple/pyserial/
.. _pywin32: http://pypi.python.org/pypi/pywin32
+.. _`2.7`: https://pypi.python.org/pypi/pyserial/2.7
+.. _`1.21`: https://sourceforge.net/projects/pyserial/files/pyserial/1.21/pyserial-1.21.zip/download
diff --git a/documentation/pyserial_api.rst b/documentation/pyserial_api.rst
index 75e19e7..cec0078 100644
--- a/documentation/pyserial_api.rst
+++ b/documentation/pyserial_api.rst
@@ -111,7 +111,20 @@ Native ports
.. method:: open()
- Open port.
+ Open port. The state of :attr:`rts` and :attr:`dtr` is applied.
+
+ .. note::
+
+ Some OS and/or drivers may activate RTS and or DTR automatically,
+ as soon as the port is opened. There may be a glitch on RTS/DTR
+ when :attr:`rts`` or :attr:`dtr` are set differently from their
+ default value (``True`` / active).
+
+ .. note::
+
+ For compatibility reasons, no error is reported when applying
+ :attr:`rts` or :attr:`dtr` fails on POSIX due to EINVAL (22) or
+ ENOTTY (25).
.. method:: close()
@@ -226,7 +239,7 @@ Native ports
Set RTS line to specified logic level. It is possible to assign this
value before opening the serial port, then the value is applied uppon
- :meth:`open`.
+ :meth:`open` (with restrictions, see :meth:`open`).
.. attribute:: dtr
@@ -236,7 +249,7 @@ Native ports
Set DTR line to specified logic level. It is possible to assign this
value before opening the serial port, then the value is applied uppon
- :meth:`open`.
+ :meth:`open` (with restrictions, see :meth:`open`).
Read-only attributes:
@@ -275,6 +288,10 @@ Native ports
Return the state of the CD line
+ .. attribute:: is_open
+
+ :getter: Get the state of the serial port, whether it's open.
+ :type: bool
New values can be assigned to the following attributes (properties), the
port will be reconfigured, even if it's opened at that time:
@@ -476,6 +493,34 @@ Native ports
.. versionadded:: 2.5
.. versionchanged:: 3.0 renamed from ``applySettingsDict``
+
+ This class can be used as context manager. The serial port is closed when
+ the context is left.
+
+ .. method:: __enter__()
+
+ :returns: Serial instance
+
+ Returns the instance that was used in the ``with`` statement.
+
+ Example:
+
+ >>> with serial.serial_for_url(port) as s:
+ ... s.write(b'hello')
+
+ Here no port argument is given, so it is not opened automatically:
+
+ >>> with serial.Serial() as s:
+ ... s.port = ...
+ ... s.open()
+ ... s.write(b'hello')
+
+
+ .. method:: __exit__(exc_type, exc_val, exc_tb)
+
+ Closes serial port.
+
+
Platform specific methods.
.. warning:: Programs using the following methods and attributes are not
@@ -564,6 +609,10 @@ Native ports
.. deprecated:: 3.0 see :attr:`in_waiting`
+ .. method:: isOpen()
+
+ .. deprecated:: 3.0 see :attr:`is_open`
+
.. attribute:: writeTimeout
.. deprecated:: 3.0 see :attr:`write_timeout`
@@ -659,12 +708,14 @@ enable RS485 specific support on some platforms. Currently Windows and Linux
Usage::
+ import serial
+ import serial.rs485
ser = serial.Serial(...)
ser.rs485_mode = serial.rs485.RS485Settings(...)
ser.write(b'hello')
There is a subclass :class:`rs485.RS485` available to emulate the RS485 support
-on regular serial ports.
+on regular serial ports (``serial.rs485`` needs to be imported).
.. class:: rs485.RS485Settings
@@ -1202,48 +1253,13 @@ Example::
asyncio
=======
-.. module:: serial.aio
-
-.. warning:: This implementation is currently in an experimental state. Use
- at your own risk.
-
-Experimental asyncio support is available for Python 3.4 and newer. The module
-:mod:`serial.aio` provides a :class:`asyncio.Transport`:
-``SerialTransport``.
+``asyncio`` was introduced with Python 3.4. Experimental support for pySerial
+is provided via a separate distribution `pyserial-asyncio`_.
+It is currently under developement, see:
-A factory function (`asyncio.coroutine`) is provided:
+- http://pyserial-asyncio.readthedocs.io/
+- https://github.com/pyserial/pyserial-asyncio
-.. function:: create_serial_connection(loop, protocol_factory, \*args, \*\*kwargs)
-
- :param loop: The event handler
- :param protocol_factory: Factory function for a :class:`asyncio.Protocol`
- :param args: Passed to the :class:`serial.Serial` init function
- :param kwargs: Passed to the :class:`serial.Serial` init function
- :platform: Posix
-
- Get a connection making coroutine.
-
-Example::
-
- class Output(asyncio.Protocol):
- def connection_made(self, transport):
- self.transport = transport
- print('port opened', transport)
- transport.serial.rts = False
- transport.write(b'hello world\n')
-
- def data_received(self, data):
- print('data received', repr(data))
- self.transport.close()
-
- def connection_lost(self, exc):
- print('port closed')
- asyncio.get_event_loop().stop()
-
- loop = asyncio.get_event_loop()
- coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
- loop.run_until_complete(coro)
- loop.run_forever()
- loop.close()
+.. _`pyserial-asyncio`: https://pypi.python.org/pypi/pyserial-asyncio
diff --git a/documentation/url_handlers.rst b/documentation/url_handlers.rst
index ae331f9..81e6ff8 100644
--- a/documentation/url_handlers.rst
+++ b/documentation/url_handlers.rst
@@ -122,6 +122,8 @@ Supported options in the URL are:
not locked automatically (e.g. Posix).
+.. _spy:
+
``spy://``
==========
Wrapping the native serial port, this protocol makes it possible to
diff --git a/examples/at_protocol.py b/examples/at_protocol.py
index 36eb6bd..7d43007 100644
--- a/examples/at_protocol.py
+++ b/examples/at_protocol.py
@@ -91,7 +91,7 @@ class ATProtocol(serial.threaded.LineReader):
else:
lines.append(line)
except queue.Empty:
- raise ATException('AT command timeout (%r)' % (command,))
+ raise ATException('AT command timeout ({!r})'.format(command))
# test
@@ -123,16 +123,16 @@ if __name__ == '__main__':
"""Handle events and command responses starting with '+...'"""
if event.startswith('+RRBDRES') and self._awaiting_response_for.startswith('AT+JRBD'):
rev = event[9:9 + 12]
- mac = ':'.join('%02X' % ord(x) for x in rev.decode('hex')[::-1])
+ mac = ':'.join('{:02X}'.format(ord(x)) for x in rev.decode('hex')[::-1])
self.event_responses.put(mac)
else:
- logging.warning('unhandled event: %r' % event)
+ logging.warning('unhandled event: {!r}'.format(event))
def command_with_event_response(self, command):
"""Send a command that responds with '+...' line"""
with self.lock: # ensure that just one thread is sending commands at once
self._awaiting_response_for = command
- self.transport.write(b'%s\r\n' % (command.encode(self.ENCODING, self.UNICODE_HANDLING),))
+ self.transport.write(b'{}\r\n'.format(command.encode(self.ENCODING, self.UNICODE_HANDLING)))
response = self.event_responses.get()
self._awaiting_response_for = None
return response
@@ -143,7 +143,7 @@ if __name__ == '__main__':
self.command("AT+JRES", response='ROK') # SW-Reset BT module
def get_mac_address(self):
- # requests hardware / calibrationinfo as event
+ # requests hardware / calibration info as event
return self.command_with_event_response("AT+JRBD")
ser = serial.serial_for_url('spy://COM1', baudrate=115200, timeout=1)
diff --git a/examples/port_publisher.py b/examples/port_publisher.py
index cf44945..ae07f77 100755
--- a/examples/port_publisher.py
+++ b/examples/port_publisher.py
@@ -86,7 +86,7 @@ class ZeroconfService:
self.group = None
def __str__(self):
- return "%r @ %s:%s (%s)" % (self.name, self.host, self.port, self.stype)
+ return "{!r} @ {}:{} ({})".format(self.name, self.host, self.port, self.stype)
class Forwarder(ZeroconfService):
@@ -154,7 +154,7 @@ class Forwarder(ZeroconfService):
self.handle_server_error()
#~ raise
if self.log is not None:
- self.log.info("%s: Waiting for connection on %s..." % (self.device, self.network_port))
+ self.log.info("{}: Waiting for connection on {}...".format(self.device, self.network_port))
# zeroconfig
self.publish()
@@ -165,7 +165,7 @@ class Forwarder(ZeroconfService):
def close(self):
"""Close all resources and unpublish service"""
if self.log is not None:
- self.log.info("%s: closing..." % (self.device, ))
+ self.log.info("{}: closing...".format(self.device))
self.alive = False
self.unpublish()
if self.server_socket:
@@ -291,7 +291,7 @@ class Forwarder(ZeroconfService):
self.socket.setblocking(0)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.log is not None:
- self.log.warning('%s: Connected by %s:%s' % (self.device, addr[0], addr[1]))
+ self.log.warning('{}: Connected by {}:{}'.format(self.device, addr[0], addr[1]))
self.serial.rts = True
self.serial.dtr = True
if self.log is not None:
@@ -302,7 +302,7 @@ class Forwarder(ZeroconfService):
# reject connection if there is already one
connection.close()
if self.log is not None:
- self.log.warning('%s: Rejecting connect from %s:%s' % (self.device, addr[0], addr[1]))
+ self.log.warning('{}: Rejecting connect from {}:{}'.format(self.device, addr[0], addr[1]))
def handle_server_error(self):
"""Socket server fails"""
@@ -326,7 +326,7 @@ class Forwarder(ZeroconfService):
self.socket.close()
self.socket = None
if self.log is not None:
- self.log.warning('%s: Disconnected' % self.device)
+ self.log.warning('{}: Disconnected'.format(self.device))
def test():
@@ -451,7 +451,7 @@ terminated, it waits for the next connect.
# exit first parent
sys.exit(0)
except OSError as e:
- log.critical("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
+ log.critical("fork #1 failed: {} ({})\n".format(e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
@@ -465,10 +465,10 @@ terminated, it waits for the next connect.
if pid > 0:
# exit from second parent, save eventual PID before
if args.pidfile is not None:
- open(args.pidfile, 'w').write("%d" % pid)
+ open(args.pidfile, 'w').write("{}".formt(pid))
sys.exit(0)
except OSError as e:
- log.critical("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
+ log.critical("fork #2 failed: {} ({})\n".format(e.errno, e.strerror))
sys.exit(1)
if args.logfile is None:
@@ -512,7 +512,7 @@ terminated, it waits for the next connect.
except KeyError:
pass
else:
- log.info("unpublish: %s" % (forwarder))
+ log.info("unpublish: {}".format(forwarder))
alive = True
next_check = 0
@@ -526,7 +526,7 @@ terminated, it waits for the next connect.
connected = [d for d, p, i in serial.tools.list_ports.grep(args.ports_regex)]
# Handle devices that are published, but no longer connected
for device in set(published).difference(connected):
- log.info("unpublish: %s" % (published[device]))
+ log.info("unpublish: {}".format(published[device]))
unpublish(published[device])
# Handle devices that are connected but not yet published
for device in sorted(set(connected).difference(published)):
@@ -537,11 +537,11 @@ terminated, it waits for the next connect.
port += 1
published[device] = Forwarder(
device,
- "%s on %s" % (device, hostname),
+ "{} on {}".format(device, hostname),
port,
on_close=unpublish,
log=log)
- log.warning("publish: %s" % (published[device]))
+ log.warning("publish: {}".format(published[device]))
published[device].open()
# select_start = time.time()
diff --git a/examples/rfc2217_server.py b/examples/rfc2217_server.py
index 7830e40..5955fc0 100755
--- a/examples/rfc2217_server.py
+++ b/examples/rfc2217_server.py
@@ -58,7 +58,7 @@ class Redirector(object):
# escape outgoing data when needed (Telnet IAC (0xff) character)
self.write(serial.to_bytes(self.rfc2217.escape(data)))
except socket.error as msg:
- self.log.error('%s' % (msg,))
+ self.log.error('{}'.format(msg))
# probably got disconnected
break
self.alive = False
@@ -78,7 +78,7 @@ class Redirector(object):
break
self.serial.write(serial.to_bytes(self.rfc2217.filter(data)))
except socket.error as msg:
- self.log.error('%s' % (msg,))
+ self.log.error('{}'.format(msg))
# probably got disconnected
break
self.stop()
diff --git a/examples/tcp_serial_redirect.py b/examples/tcp_serial_redirect.py
index 97a73b4..8440296 100755
--- a/examples/tcp_serial_redirect.py
+++ b/examples/tcp_serial_redirect.py
@@ -2,7 +2,7 @@
#
# Redirect data from a TCP/IP connection to a serial port and vice versa.
#
-# (C) 2002-2015 Chris Liechti <cliechti@gmx.net>
+# (C) 2002-2016 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
@@ -10,6 +10,7 @@ import sys
import socket
import serial
import serial.threaded
+import time
class SerialToNet(serial.threaded.Protocol):
@@ -56,6 +57,12 @@ it waits for the next connect.
help='suppress non error messages',
default=False)
+ parser.add_argument(
+ '--develop',
+ action='store_true',
+ help='Development mode, prints Python internals on errors',
+ default=False)
+
group = parser.add_argument_group('serial port')
group.add_argument(
@@ -91,12 +98,20 @@ it waits for the next connect.
group = parser.add_argument_group('network settings')
- group.add_argument(
+ exclusive_group = group.add_mutually_exclusive_group()
+
+ exclusive_group.add_argument(
'-P', '--localport',
type=int,
help='local TCP port',
default=7777)
+ exclusive_group.add_argument(
+ '-c', '--client',
+ metavar='HOST:PORT',
+ help='make the connection as a client, instead of running a server',
+ default=False)
+
args = parser.parse_args()
# connect to serial port
@@ -127,15 +142,40 @@ it waits for the next connect.
serial_worker = serial.threaded.ReaderThread(ser, ser_to_net)
serial_worker.start()
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- srv.bind(('', args.localport))
- srv.listen(1)
+ if not args.client:
+ srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ srv.bind(('', args.localport))
+ srv.listen(1)
try:
+ intentional_exit = False
while True:
- sys.stderr.write('Waiting for connection on {}...\n'.format(args.localport))
- client_socket, addr = srv.accept()
- sys.stderr.write('Connected by {}\n'.format(addr))
+ if args.client:
+ host, port = args.client.split(':')
+ sys.stderr.write("Opening connection to {}:{}...\n".format(host, port))
+ client_socket = socket.socket()
+ try:
+ client_socket.connect((host, int(port)))
+ except socket.error as msg:
+ sys.stderr.write('WARNING: {}\n'.format(msg))
+ time.sleep(5) # intentional delay on reconnection as client
+ continue
+ sys.stderr.write('Connected\n')
+ client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ #~ client_socket.settimeout(5)
+ else:
+ sys.stderr.write('Waiting for connection on {}...\n'.format(args.localport))
+ client_socket, addr = srv.accept()
+ sys.stderr.write('Connected by {}\n'.format(addr))
+ # More quickly detect bad clients who quit without closing the
+ # connection: After 1 second of idle, start sending TCP keep-alive
+ # packets every 1 second. If 3 consecutive keep-alive packets
+ # fail, assume the client is gone and close the connection.
+ client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1)
+ client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1)
+ client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
+ client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
ser_to_net.socket = client_socket
# enter network <-> serial loop
@@ -146,15 +186,24 @@ it waits for the next connect.
break
ser.write(data) # get a bunch of bytes and send them
except socket.error as msg:
- sys.stderr.write('ERROR: %s\n' % msg)
+ if args.develop:
+ raise
+ sys.stderr.write('ERROR: {}\n'.format(msg))
# probably got disconnected
break
+ except KeyboardInterrupt:
+ intentional_exit = True
+ raise
except socket.error as msg:
+ if args.develop:
+ raise
sys.stderr.write('ERROR: {}\n'.format(msg))
finally:
ser_to_net.socket = None
sys.stderr.write('Disconnected\n')
client_socket.close()
+ if args.client and not intentional_exit:
+ time.sleep(5) # intentional delay on reconnection as client
except KeyboardInterrupt:
pass
diff --git a/examples/wxSerialConfigDialog.py b/examples/wxSerialConfigDialog.py
index a29b67d..0064a9c 100755
--- a/examples/wxSerialConfigDialog.py
+++ b/examples/wxSerialConfigDialog.py
@@ -99,7 +99,7 @@ class SerialConfigDialog(wx.Dialog):
self.choice_port.Clear()
self.ports = []
for n, (portname, desc, hwid) in enumerate(sorted(serial.tools.list_ports.comports())):
- self.choice_port.Append('%s - %s' % (portname, desc))
+ self.choice_port.Append(u'{} - {}'.format(portname, desc))
self.ports.append(portname)
if self.serial.name == portname:
preferred_index = n
@@ -115,7 +115,7 @@ class SerialConfigDialog(wx.Dialog):
if preferred_index is not None:
self.combo_box_baudrate.SetSelection(preferred_index)
else:
- self.combo_box_baudrate.SetValue(u'%d' % (self.serial.baudrate,))
+ self.combo_box_baudrate.SetValue(u'{}'.format(self.serial.baudrate))
if self.show & SHOW_FORMAT:
# fill in data bits and select current setting
self.choice_databits.Clear()
diff --git a/examples/wxTerminal.py b/examples/wxTerminal.py
index 4ebabb7..0811721 100755
--- a/examples/wxTerminal.py
+++ b/examples/wxTerminal.py
@@ -274,15 +274,15 @@ class TerminalFrame(wx.Frame):
dlg.ShowModal()
else:
self.StartThread()
- self.SetTitle("Serial Terminal on %s [%s,%s,%s,%s%s%s]" % (
- self.serial.portstr,
- self.serial.baudrate,
- self.serial.bytesize,
- self.serial.parity,
- self.serial.stopbits,
- ' RTS/CTS' if self.serial.rtscts else '',
- ' Xon/Xoff' if self.serial.xonxoff else '',
- ))
+ self.SetTitle("Serial Terminal on {} [{},{},{},{}{}{}]".format(
+ self.serial.portstr,
+ self.serial.baudrate,
+ self.serial.bytesize,
+ self.serial.parity,
+ self.serial.stopbits,
+ ' RTS/CTS' if self.serial.rtscts else '',
+ ' Xon/Xoff' if self.serial.xonxoff else '',
+ ))
ok = True
else:
# on startup, dialog aborted
diff --git a/serial/rfc2217.py b/serial/rfc2217.py
index 5e3cbe3..dee5c2b 100644
--- a/serial/rfc2217.py
+++ b/serial/rfc2217.py
@@ -73,7 +73,8 @@ except ImportError:
import queue as Queue
import serial
-from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, portNotOpenError
+from serial.serialutil import SerialBase, SerialException, to_bytes, \
+ iterbytes, portNotOpenError, Timeout
# port string is expected to be something like this:
# rfc2217://host:port
@@ -350,8 +351,8 @@ class TelnetSubnegotiation(object):
can also throw a value error when the answer from the server does not
match the value sent.
"""
- timeout_time = time.time() + timeout
- while time.time() < timeout_time:
+ timeout_timer = Timeout(timeout)
+ while not timeout_timer.expired():
time.sleep(0.05) # prevent 100% CPU load
if self.is_ready():
break
@@ -384,7 +385,7 @@ class Serial(SerialBase):
self._socket = None
self._linestate = 0
self._modemstate = None
- self._modemstate_expires = 0
+ self._modemstate_timeout = Timeout(-1)
self._remote_suspend_flow = False
self._write_lock = None
self.logger = None
@@ -453,7 +454,7 @@ class Serial(SerialBase):
# cache for line and modem states that the server sends to us
self._linestate = 0
self._modemstate = None
- self._modemstate_expires = 0
+ self._modemstate_timeout = Timeout(-1)
# RFC 2217 flow control between server and client
self._remote_suspend_flow = False
@@ -469,8 +470,8 @@ class Serial(SerialBase):
if option.state is REQUESTED:
self.telnet_send_option(option.send_yes, option.option)
# now wait until important options are negotiated
- timeout_time = time.time() + self._network_timeout
- while time.time() < timeout_time:
+ timeout = Timeout(self._network_timeout)
+ while not timeout.expired():
time.sleep(0.05) # prevent 100% CPU load
if sum(o.active for o in mandadory_options) == sum(o.state != INACTIVE for o in mandadory_options):
break
@@ -518,8 +519,8 @@ class Serial(SerialBase):
items = self._rfc2217_port_settings.values()
if self.logger:
self.logger.debug("Negotiating settings: {}".format(items))
- timeout_time = time.time() + self._network_timeout
- while time.time() < timeout_time:
+ timeout = Timeout(self._network_timeout)
+ while not timeout.expired():
time.sleep(0.05) # prevent 100% CPU load
if sum(o.active for o in items) == len(items):
break
@@ -822,7 +823,7 @@ class Serial(SerialBase):
if self.logger:
self.logger.info("NOTIFY_MODEMSTATE: {}".format(self._modemstate))
# update time when we think that a poll would make sense
- self._modemstate_expires = time.time() + 0.3
+ self._modemstate_timeout.restart(0.3)
elif suboption[1:2] == FLOWCONTROL_SUSPEND:
self._remote_suspend_flow = True
elif suboption[1:2] == FLOWCONTROL_RESUME:
@@ -893,17 +894,17 @@ class Serial(SerialBase):
etc.)
"""
# active modem state polling enabled? is the value fresh enough?
- if self._poll_modem_state and self._modemstate_expires < time.time():
+ if self._poll_modem_state and self._modemstate_timeout.expired():
if self.logger:
self.logger.debug('polling modem state')
# when it is older, request an update
self.rfc2217_send_subnegotiation(NOTIFY_MODEMSTATE)
- timeout_time = time.time() + self._network_timeout
- while time.time() < timeout_time:
+ timeout = Timeout(self._network_timeout)
+ while not timeout.expired():
time.sleep(0.05) # prevent 100% CPU load
# when expiration time is updated, it means that there is a new
# value
- if self._modemstate_expires > time.time():
+ if not self._modemstate_timeout.expired():
break
else:
if self.logger:
diff --git a/serial/serialposix.py b/serial/serialposix.py
index 913b643..01848e9 100644
--- a/serial/serialposix.py
+++ b/serial/serialposix.py
@@ -34,10 +34,10 @@ import select
import struct
import sys
import termios
-import time
import serial
-from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError
+from serial.serialutil import SerialBase, SerialException, to_bytes, \
+ portNotOpenError, writeTimeoutError, Timeout
class PlatformSpecificBase(object):
@@ -49,6 +49,11 @@ class PlatformSpecificBase(object):
def _set_rs485_mode(self, rs485_settings):
raise NotImplementedError('RS485 not supported on this platform')
+
+# some systems support an extra flag to enable the two in POSIX unsupported
+# paritiy settings for MARK and SPACE
+CMSPAR = 0 # default, for unsupported platforms, override below
+
# try to detect the OS so that a device can be selected...
# this code block should supply a device() and set_special_baudrate() function
# for the platform
@@ -57,6 +62,9 @@ plat = sys.platform.lower()
if plat[:5] == 'linux': # Linux (confirmed) # noqa
import array
+ # extra termios flags
+ CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity
+
# baudrate ioctls
TCGETS2 = 0x802C542A
TCSETS2 = 0x402C542B
@@ -181,6 +189,21 @@ elif plat[:6] == 'darwin': # OS X
buf = array.array('i', [baudrate])
fcntl.ioctl(self.fd, IOSSIOSPEED, buf, 1)
+elif plat[:3] == 'bsd' or \
+ plat[:7] == 'freebsd' or \
+ plat[:6] == 'netbsd' or \
+ plat[:7] == 'openbsd':
+
+ class ReturnBaudrate(object):
+ def __getitem__(self, key):
+ return key
+
+ class PlatformSpecific(PlatformSpecificBase):
+ # Only tested on FreeBSD:
+ # The baud rate may be passed in as
+ # a literal value.
+ BAUDRATE_CONSTANTS = ReturnBaudrate()
+
else:
class PlatformSpecific(PlatformSpecificBase):
pass
@@ -220,8 +243,6 @@ TIOCM_DTR_str = struct.pack('I', TIOCM_DTR)
TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427)
TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428)
-CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity
-
class Serial(SerialBase, PlatformSpecific):
"""\
@@ -266,7 +287,8 @@ class Serial(SerialBase, PlatformSpecific):
if not self._rtscts:
self._update_rts_state()
except IOError as e:
- if e.errno in (22, 25): # ignore Invalid argument and Inappropriate ioctl
+ if e.errno in (errno.EINVAL, errno.ENOTTY):
+ # ignore Invalid argument and Inappropriate ioctl
pass
else:
raise
@@ -349,15 +371,16 @@ class Serial(SerialBase, PlatformSpecific):
# setup parity
iflag &= ~(termios.INPCK | termios.ISTRIP)
if self._parity == serial.PARITY_NONE:
- cflag &= ~(termios.PARENB | termios.PARODD)
+ cflag &= ~(termios.PARENB | termios.PARODD | CMSPAR)
elif self._parity == serial.PARITY_EVEN:
- cflag &= ~(termios.PARODD)
+ cflag &= ~(termios.PARODD | CMSPAR)
cflag |= (termios.PARENB)
elif self._parity == serial.PARITY_ODD:
+ cflag &= ~CMSPAR
cflag |= (termios.PARENB | termios.PARODD)
- elif self._parity == serial.PARITY_MARK and plat[:5] == 'linux':
+ elif self._parity == serial.PARITY_MARK and CMSPAR:
cflag |= (termios.PARENB | CMSPAR | termios.PARODD)
- elif self._parity == serial.PARITY_SPACE and plat[:5] == 'linux':
+ elif self._parity == serial.PARITY_SPACE and CMSPAR:
cflag |= (termios.PARENB | CMSPAR)
cflag &= ~(termios.PARODD)
else:
@@ -443,11 +466,10 @@ class Serial(SerialBase, PlatformSpecific):
if not self.is_open:
raise portNotOpenError
read = bytearray()
- timeout = self._timeout
+ timeout = Timeout(self._timeout)
while len(read) < size:
try:
- start_time = time.time()
- ready, _, _ = select.select([self.fd, self.pipe_abort_read_r], [], [], timeout)
+ ready, _, _ = select.select([self.fd, self.pipe_abort_read_r], [], [], timeout.time_left())
if self.pipe_abort_read_r in ready:
os.read(self.pipe_abort_read_r, 1000)
break
@@ -479,10 +501,8 @@ class Serial(SerialBase, PlatformSpecific):
# see also http://www.python.org/dev/peps/pep-3151/#select
if e[0] != errno.EAGAIN:
raise SerialException('read failed: {}'.format(e))
- if timeout is not None:
- timeout -= time.time() - start_time
- if timeout <= 0:
- break
+ if timeout.expired():
+ break
return bytes(read)
def cancel_read(self):
@@ -497,30 +517,27 @@ class Serial(SerialBase, PlatformSpecific):
raise portNotOpenError
d = to_bytes(data)
tx_len = len(d)
- timeout = self._write_timeout
- if timeout and timeout > 0: # Avoid comparing None with zero
- timeout += time.time()
+ timeout = Timeout(self._write_timeout)
while tx_len > 0:
try:
n = os.write(self.fd, d)
- if timeout == 0:
+ if timeout.is_non_blocking:
# Zero timeout indicates non-blocking - simply return the
# number of bytes of data actually written
return n
- elif timeout and timeout > 0: # Avoid comparing None with zero
+ elif not timeout.is_infinite:
# when timeout is set, use select to wait for being ready
# with the time left as timeout
- timeleft = timeout - time.time()
- if timeleft < 0:
+ if timeout.expired():
raise writeTimeoutError
- abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], timeleft)
+ abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], timeout.time_left())
if abort:
os.read(self.pipe_abort_write_r, 1000)
break
if not ready:
raise writeTimeoutError
else:
- assert timeout is None
+ assert timeout.time_left() is None
# wait for write operation
abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], None)
if abort:
@@ -536,7 +553,7 @@ class Serial(SerialBase, PlatformSpecific):
if v.errno != errno.EAGAIN:
raise SerialException('write failed: {}'.format(v))
# still calculate and check timeout
- if timeout and timeout - time.time() < 0:
+ if timeout.expired():
raise writeTimeoutError
return len(data)
@@ -730,6 +747,9 @@ class VTIMESerial(Serial):
if self._inter_byte_timeout is not None:
vmin = 1
vtime = int(self._inter_byte_timeout * 10)
+ elif self._timeout is None:
+ vmin = 1
+ vtime = 0
else:
vmin = 0
vtime = int(self._timeout * 10)
@@ -764,3 +784,6 @@ class VTIMESerial(Serial):
break
read.extend(buf)
return bytes(read)
+
+ # hack to make hasattr return false
+ cancel_read = property()
diff --git a/serial/serialutil.py b/serial/serialutil.py
index 474b4c2..636a10c 100644
--- a/serial/serialutil.py
+++ b/serial/serialutil.py
@@ -3,7 +3,7 @@
# Base class and support functions used by various backends.
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
-# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
+# (C) 2001-2016 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
@@ -62,14 +62,9 @@ def to_bytes(seq):
elif isinstance(seq, unicode):
raise TypeError('unicode strings are not supported, please encode to bytes: {!r}'.format(seq))
else:
- b = bytearray()
- for item in seq:
- # this one handles int and bytes in Python 2.7
- # add conversion in case of Python 3.x
- if isinstance(item, bytes):
- item = ord(item)
- b.append(item)
- return bytes(b)
+ # handle list of integers and bytes (one or more items) for Python 2 and 3
+ return bytes(bytearray(seq))
+
# create control bytes
XON = to_bytes([17])
@@ -104,6 +99,65 @@ writeTimeoutError = SerialTimeoutException('Write timeout')
portNotOpenError = SerialException('Attempting to use a port that is not open')
+class Timeout(object):
+ """\
+ Abstraction for timeout operations. Using time.monotonic() if available
+ or time.time() in all other cases.
+
+ The class can also be initialized with 0 or None, in order to support
+ non-blocking and fully blocking I/O operations. The attributes
+ is_non_blocking and is_infinite are set accordingly.
+ """
+ if hasattr(time, 'monotonic'):
+ # Timeout implementation with time.monotonic(). This function is only
+ # supported by Python 3.3 and above. It returns a time in seconds
+ # (float) just as time.time(), but is not affected by system clock
+ # adjustments.
+ TIME = time.monotonic
+ else:
+ # Timeout implementation with time.time(). This is compatible with all
+ # Python versions but has issues if the clock is adjusted while the
+ # timeout is running.
+ TIME = time.time
+
+ def __init__(self, duration):
+ """Initialize a timeout with given duration"""
+ self.is_infinite = (duration is None)
+ self.is_non_blocking = (duration == 0)
+ self.duration = duration
+ if duration is not None:
+ self.target_time = self.TIME() + duration
+ else:
+ self.target_time = None
+
+ def expired(self):
+ """Return a boolean, telling if the timeout has expired"""
+ return self.target_time is not None and self.time_left() <= 0
+
+ def time_left(self):
+ """Return how many seconds are left until the timeout expires"""
+ if self.is_non_blocking:
+ return 0
+ elif self.is_infinite:
+ return None
+ else:
+ delta = self.target_time - self.TIME()
+ if delta > self.duration:
+ # clock jumped, recalculate
+ self.target_time = self.TIME() + self.duration
+ return self.duration
+ else:
+ return max(0, delta)
+
+ def restart(self, duration):
+ """\
+ Restart a timeout, only supported if a timeout was already set up
+ before.
+ """
+ self.duration = duration
+ self.target_time = self.TIME() + duration
+
+
class SerialBase(io.RawIOBase):
"""\
Serial port base class. Provides __init__ function and properties to
diff --git a/serial/serialwin32.py b/serial/serialwin32.py
index 484c4a1..b275ea3 100644
--- a/serial/serialwin32.py
+++ b/serial/serialwin32.py
@@ -309,18 +309,29 @@ class Serial(SerialBase):
if data:
#~ win32event.ResetEvent(self._overlapped_write.hEvent)
n = win32.DWORD()
- err = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write)
- if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
- raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError()))
+ success = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write)
if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0)
+ if not success and win32.GetLastError() != win32.ERROR_IO_PENDING:
+ raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError()))
+
# Wait for the write to complete.
#~ win32.WaitForSingleObject(self._overlapped_write.hEvent, win32.INFINITE)
- err = win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True)
+ win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True)
if win32.GetLastError() == win32.ERROR_OPERATION_ABORTED:
return n.value # canceled IO is no error
if n.value != len(data):
raise writeTimeoutError
- return n.value
+ return n.value
+ else:
+ errorcode = win32.ERROR_SUCCESS if success else win32.GetLastError()
+ if errorcode in (win32.ERROR_INVALID_USER_BUFFER, win32.ERROR_NOT_ENOUGH_MEMORY,
+ win32.ERROR_OPERATION_ABORTED):
+ return 0
+ elif errorcode in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING):
+ # no info on true length provided by OS function in async mode
+ return len(data)
+ else:
+ raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError()))
else:
return 0
@@ -332,7 +343,7 @@ class Serial(SerialBase):
while self.out_waiting:
time.sleep(0.05)
# XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would
- # require overlapped IO and its also only possible to set a single mask
+ # require overlapped IO and it's also only possible to set a single mask
# on the port---
def reset_input_buffer(self):
@@ -405,7 +416,7 @@ class Serial(SerialBase):
def set_buffer_size(self, rx_size=4096, tx_size=None):
"""\
Recommend a buffer size to the driver (device driver can ignore this
- value). Must be called before the port is opended.
+ value). Must be called before the port is opened.
"""
if tx_size is None:
tx_size = rx_size
diff --git a/serial/threaded/__init__.py b/serial/threaded/__init__.py
index 325d4a3..74b6924 100644
--- a/serial/threaded/__init__.py
+++ b/serial/threaded/__init__.py
@@ -3,7 +3,7 @@
# Working with threading and pySerial
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
-# (C) 2015 Chris Liechti <cliechti@gmx.net>
+# (C) 2015-2016 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
"""\
@@ -102,10 +102,10 @@ class FramedPacket(Protocol):
self.in_packet = True
elif byte == self.STOP:
self.in_packet = False
- self.handle_packet(self.packet)
+ self.handle_packet(bytes(self.packet)) # make read-only copy
del self.packet[:]
elif self.in_packet:
- self.packet.append(byte)
+ self.packet.extend(byte)
else:
self.handle_out_of_packet_data(byte)
@@ -273,7 +273,7 @@ if __name__ == '__main__':
self.write_line('hello world')
def handle_line(self, data):
- sys.stdout.write('line received: {}\n'.format(repr(data)))
+ sys.stdout.write('line received: {!r}\n'.format(data))
def connection_lost(self, exc):
if exc:
diff --git a/serial/tools/list_ports_common.py b/serial/tools/list_ports_common.py
index e5935c9..df12939 100644
--- a/serial/tools/list_ports_common.py
+++ b/serial/tools/list_ports_common.py
@@ -55,8 +55,8 @@ class ListPortInfo(object):
def usb_info(self):
"""return a string with USB related information about device"""
return 'USB VID:PID={:04X}:{:04X}{}{}'.format(
- self.vid,
- self.pid,
+ self.vid or 0,
+ self.pid or 0,
' SER={}'.format(self.serial_number) if self.serial_number is not None else '',
' LOCATION={}'.format(self.location) if self.location is not None else '')
diff --git a/serial/tools/list_ports_windows.py b/serial/tools/list_ports_windows.py
index a2922f8..a070559 100644
--- a/serial/tools/list_ports_windows.py
+++ b/serial/tools/list_ports_windows.py
@@ -17,7 +17,6 @@ from ctypes.wintypes import DWORD
from ctypes.wintypes import WORD
from ctypes.wintypes import LONG
from ctypes.wintypes import ULONG
-from ctypes.wintypes import LPCSTR
from ctypes.wintypes import HKEY
from ctypes.wintypes import BYTE
import serial
@@ -30,11 +29,12 @@ def ValidHandle(value, func, arguments):
raise ctypes.WinError()
return value
+
NULL = 0
HDEVINFO = ctypes.c_void_p
-PCTSTR = ctypes.c_char_p
-PTSTR = ctypes.c_void_p
-CHAR = ctypes.c_char
+LPCTSTR = ctypes.c_wchar_p
+PCTSTR = ctypes.c_wchar_p
+PTSTR = ctypes.c_wchar_p
LPDWORD = PDWORD = ctypes.POINTER(DWORD)
#~ LPBYTE = PBYTE = ctypes.POINTER(BYTE)
LPBYTE = PBYTE = ctypes.c_void_p # XXX avoids error about types
@@ -43,20 +43,6 @@ ACCESS_MASK = DWORD
REGSAM = ACCESS_MASK
-def byte_buffer(length):
- """Get a buffer for a string"""
- return (BYTE * length)()
-
-
-def string(buffer):
- s = []
- for c in buffer:
- if c == 0:
- break
- s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned
- return ''.join(s)
-
-
class GUID(ctypes.Structure):
_fields_ = [
('Data1', DWORD),
@@ -86,6 +72,7 @@ class SP_DEVINFO_DATA(ctypes.Structure):
def __str__(self):
return "ClassGuid:{} DevInst:{}".format(self.ClassGuid, self.DevInst)
+
PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA)
PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p
@@ -95,7 +82,7 @@ SetupDiDestroyDeviceInfoList = setupapi.SetupDiDestroyDeviceInfoList
SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO]
SetupDiDestroyDeviceInfoList.restype = BOOL
-SetupDiClassGuidsFromName = setupapi.SetupDiClassGuidsFromNameA
+SetupDiClassGuidsFromName = setupapi.SetupDiClassGuidsFromNameW
SetupDiClassGuidsFromName.argtypes = [PCTSTR, ctypes.POINTER(GUID), DWORD, PDWORD]
SetupDiClassGuidsFromName.restype = BOOL
@@ -103,16 +90,16 @@ SetupDiEnumDeviceInfo = setupapi.SetupDiEnumDeviceInfo
SetupDiEnumDeviceInfo.argtypes = [HDEVINFO, DWORD, PSP_DEVINFO_DATA]
SetupDiEnumDeviceInfo.restype = BOOL
-SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsA
+SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsW
SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD]
SetupDiGetClassDevs.restype = HDEVINFO
SetupDiGetClassDevs.errcheck = ValidHandle
-SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyA
+SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyW
SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD]
SetupDiGetDeviceRegistryProperty.restype = BOOL
-SetupDiGetDeviceInstanceId = setupapi.SetupDiGetDeviceInstanceIdA
+SetupDiGetDeviceInstanceId = setupapi.SetupDiGetDeviceInstanceIdW
SetupDiGetDeviceInstanceId.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, PTSTR, DWORD, PDWORD]
SetupDiGetDeviceInstanceId.restype = BOOL
@@ -125,8 +112,8 @@ RegCloseKey = advapi32.RegCloseKey
RegCloseKey.argtypes = [HKEY]
RegCloseKey.restype = LONG
-RegQueryValueEx = advapi32.RegQueryValueExA
-RegQueryValueEx.argtypes = [HKEY, LPCSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD]
+RegQueryValueEx = advapi32.RegQueryValueExW
+RegQueryValueEx.argtypes = [HKEY, LPCTSTR , LPDWORD, LPDWORD, LPBYTE, LPDWORD]
RegQueryValueEx.restype = LONG
@@ -141,17 +128,13 @@ DICS_FLAG_GLOBAL = 1
DIREG_DEV = 0x00000001
KEY_READ = 0x20019
-# workaround for compatibility between Python 2.x and 3.x
-Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports"
-PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName"
-
def iterate_comports():
"""Return a generator that yields descriptions for serial ports"""
GUIDs = (GUID * 8)() # so far only seen one used, so hope 8 are enough...
guids_size = DWORD()
if not SetupDiClassGuidsFromName(
- Ports,
+ "Ports",
GUIDs,
ctypes.sizeof(GUIDs),
ctypes.byref(guids_size)):
@@ -179,11 +162,11 @@ def iterate_comports():
0,
DIREG_DEV, # DIREG_DRV for SW info
KEY_READ)
- port_name_buffer = byte_buffer(250)
+ port_name_buffer = ctypes.create_unicode_buffer(250)
port_name_length = ULONG(ctypes.sizeof(port_name_buffer))
RegQueryValueEx(
hkey,
- PortName,
+ "PortName",
None,
None,
ctypes.byref(port_name_buffer),
@@ -193,16 +176,17 @@ def iterate_comports():
# unfortunately does this method also include parallel ports.
# we could check for names starting with COM or just exclude LPT
# and hope that other "unknown" names are serial ports...
- if string(port_name_buffer).startswith('LPT'):
+ if port_name_buffer.value.startswith('LPT'):
continue
# hardware ID
- szHardwareID = byte_buffer(250)
+ szHardwareID = ctypes.create_unicode_buffer(250)
# try to get ID that includes serial number
if not SetupDiGetDeviceInstanceId(
g_hdi,
ctypes.byref(devinfo),
- ctypes.byref(szHardwareID),
+ #~ ctypes.byref(szHardwareID),
+ szHardwareID,
ctypes.sizeof(szHardwareID) - 1,
None):
# fall back to more generic hardware ID if that would fail
@@ -218,21 +202,22 @@ def iterate_comports():
if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
raise ctypes.WinError()
# stringify
- szHardwareID_str = string(szHardwareID)
+ szHardwareID_str = szHardwareID.value
- info = list_ports_common.ListPortInfo(string(port_name_buffer))
+ info = list_ports_common.ListPortInfo(port_name_buffer.value)
# in case of USB, make a more readable string, similar to that form
# that we also generate on other platforms
if szHardwareID_str.startswith('USB'):
- m = re.search(r'VID_([0-9a-f]{4})&PID_([0-9a-f]{4})(\\(\w+))?', szHardwareID_str, re.I)
+ m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(\\(\w+))?', szHardwareID_str, re.I)
if m:
info.vid = int(m.group(1), 16)
- info.pid = int(m.group(2), 16)
- if m.group(4):
- info.serial_number = m.group(4)
+ if m.group(3):
+ info.pid = int(m.group(3), 16)
+ if m.group(5):
+ info.serial_number = m.group(5)
# calculate a location string
- loc_path_str = byte_buffer(250)
+ loc_path_str = ctypes.create_unicode_buffer(250)
if SetupDiGetDeviceRegistryProperty(
g_hdi,
ctypes.byref(devinfo),
@@ -241,8 +226,7 @@ def iterate_comports():
ctypes.byref(loc_path_str),
ctypes.sizeof(loc_path_str) - 1,
None):
- #~ print (string(loc_path_str))
- m = re.finditer(r'USBROOT\((\w+)\)|#USB\((\w+)\)', string(loc_path_str))
+ m = re.finditer(r'USBROOT\((\w+)\)|#USB\((\w+)\)', loc_path_str.value)
location = []
for g in m:
if g.group(1):
@@ -269,7 +253,7 @@ def iterate_comports():
info.hwid = szHardwareID_str
# friendly name
- szFriendlyName = byte_buffer(250)
+ szFriendlyName = ctypes.create_unicode_buffer(250)
if SetupDiGetDeviceRegistryProperty(
g_hdi,
ctypes.byref(devinfo),
@@ -279,7 +263,7 @@ def iterate_comports():
ctypes.byref(szFriendlyName),
ctypes.sizeof(szFriendlyName) - 1,
None):
- info.description = string(szFriendlyName)
+ info.description = szFriendlyName.value
#~ else:
# Ignore ERROR_INSUFFICIENT_BUFFER
#~ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py
index 145996f..7c68e9d 100644
--- a/serial/tools/miniterm.py
+++ b/serial/tools/miniterm.py
@@ -169,7 +169,7 @@ elif os.name == 'posix':
return c
def cancel(self):
- os.write(self.pipe_w, "x")
+ os.write(self.pipe_w, b"x")
def cleanup(self):
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
@@ -553,7 +553,7 @@ class Miniterm(object):
if new_filters:
for f in new_filters:
if f not in TRANSFORMATIONS:
- sys.stderr.write('--- unknown filter: {}'.format(repr(f)))
+ sys.stderr.write('--- unknown filter: {}\n'.format(repr(f)))
break
else:
self.filters = new_filters
diff --git a/serial/urlhandler/protocol_loop.py b/serial/urlhandler/protocol_loop.py
index 819da77..7bf6cf9 100644
--- a/serial/urlhandler/protocol_loop.py
+++ b/serial/urlhandler/protocol_loop.py
@@ -43,10 +43,11 @@ class Serial(SerialBase):
9600, 19200, 38400, 57600, 115200)
def __init__(self, *args, **kwargs):
- super(Serial, self).__init__(*args, **kwargs)
self.buffer_size = 4096
self.queue = None
self.logger = None
+ self._cancel_write = False
+ super(Serial, self).__init__(*args, **kwargs)
def open(self):
"""\
@@ -151,7 +152,7 @@ class Serial(SerialBase):
if self._timeout == 0:
break
else:
- if data is not None:
+ if b is not None:
data += b
size -= 1
else:
@@ -164,12 +165,19 @@ class Serial(SerialBase):
break
return bytes(data)
+ def cancel_read(self):
+ self.queue.put_nowait(None)
+
+ def cancel_write(self):
+ self._cancel_write = True
+
def write(self, data):
"""\
Output the given byte string over the serial port. Can block if the
connection is blocked. May raise SerialException if the connection is
closed.
"""
+ self._cancel_write = False
if not self.is_open:
raise portNotOpenError
data = to_bytes(data)
@@ -178,7 +186,13 @@ class Serial(SerialBase):
# when a write timeout is configured check if we would be successful
# (not sending anything, not even the part that would have time)
if self._write_timeout is not None and time_used_to_send > self._write_timeout:
- time.sleep(self._write_timeout) # must wait so that unit test succeeds
+ # must wait so that unit test succeeds
+ time_left = self._write_timeout
+ while time_left > 0 and not self._cancel_write:
+ time.sleep(min(time_left, 0.5))
+ time_left -= 0.5
+ if self._cancel_write:
+ return 0 # XXX
raise writeTimeoutError
for byte in iterbytes(data):
self.queue.put(byte, timeout=self._write_timeout)
diff --git a/serial/win32.py b/serial/win32.py
index 31f21ee..905ce0f 100644
--- a/serial/win32.py
+++ b/serial/win32.py
@@ -218,10 +218,14 @@ FILE_FLAG_OVERLAPPED = 1073741824 # Variable c_int
EV_DSR = 16 # Variable c_int
MAXDWORD = 4294967295 # Variable c_uint
EV_RLSD = 32 # Variable c_int
+
ERROR_SUCCESS = 0
+ERROR_NOT_ENOUGH_MEMORY = 8
ERROR_OPERATION_ABORTED = 995
ERROR_IO_INCOMPLETE = 996
ERROR_IO_PENDING = 997 # Variable c_long
+ERROR_INVALID_USER_BUFFER = 1784
+
MS_CTS_ON = 16 # Variable c_ulong
EV_EVENT1 = 2048 # Variable c_int
EV_RX80FULL = 1024 # Variable c_int
diff --git a/setup.py b/setup.py
index 20c2a22..f2b60a6 100644
--- a/setup.py
+++ b/setup.py
@@ -9,14 +9,46 @@
# (C) 2001-2016 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
+import io
+import os
+import re
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
-import serial
-version = serial.VERSION
+
+def read(*names, **kwargs):
+ """Python 2 and Python 3 compatible text file reading.
+
+ Required for single-sourcing the version string.
+ """
+ with io.open(
+ os.path.join(os.path.dirname(__file__), *names),
+ encoding=kwargs.get("encoding", "utf8")
+ ) as fp:
+ return fp.read()
+
+
+def find_version(*file_paths):
+ """
+ Search the file for a version string.
+
+ file_path contain string path components.
+
+ Reads the supplied Python module as text without importing it.
+ """
+ version_file = read(*file_paths)
+ version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
+ version_file, re.M)
+ if version_match:
+ return version_match.group(1)
+ raise RuntimeError("Unable to find version string.")
+
+
+version = find_version('serial', '__init__.py')
+
setup(
name="pyserial",
diff --git a/test/handlers/protocol_test.py b/test/handlers/protocol_test.py
index 42ac4b2..f2e572f 100644
--- a/test/handlers/protocol_test.py
+++ b/test/handlers/protocol_test.py
@@ -71,9 +71,9 @@ class DummySerial(SerialBase):
self.logger.setLevel(LOGGER_LEVELS[value])
self.logger.debug('enabled logging')
else:
- raise ValueError('unknown option: %r' % (option,))
+ raise ValueError('unknown option: {!r}'.format(option))
except ValueError as e:
- raise SerialException('expected a string in the form "[test://][option[/option...]]": %s' % e)
+ raise SerialException('expected a string in the form "[test://][option[/option...]]": {}'.format(e))
return (host, port)
# - - - - - - - - - - - - - - - - - - - - - - - -
@@ -120,26 +120,26 @@ class DummySerial(SerialBase):
duration."""
if not self._isOpen: raise portNotOpenError
if self.logger:
- self.logger.info('ignored sendBreak(%r)' % (duration,))
+ self.logger.info('ignored sendBreak({!r})'.format(duration))
def setBreak(self, level=True):
"""Set break: Controls TXD. When active, to transmitting is
possible."""
if not self._isOpen: raise portNotOpenError
if self.logger:
- self.logger.info('ignored setBreak(%r)' % (level,))
+ self.logger.info('ignored setBreak({!r})'.format(level))
def setRTS(self, level=True):
"""Set terminal status line: Request To Send"""
if not self._isOpen: raise portNotOpenError
if self.logger:
- self.logger.info('ignored setRTS(%r)' % (level,))
+ self.logger.info('ignored setRTS({!r})'.format(level))
def setDTR(self, level=True):
"""Set terminal status line: Data Terminal Ready"""
if not self._isOpen: raise portNotOpenError
if self.logger:
- self.logger.info('ignored setDTR(%r)' % (level,))
+ self.logger.info('ignored setDTR({!r})'.format(level))
def getCTS(self):
"""Read terminal status line: Clear To Send"""
@@ -192,11 +192,11 @@ else:
if __name__ == '__main__':
import sys
s = Serial('test://logging=debug')
- sys.stdout.write('%s\n' % s)
+ sys.stdout.write('{}\n'.format(s))
sys.stdout.write("write...\n")
s.write("hello\n")
s.flush()
- sys.stdout.write("read: %s\n" % s.read(5))
+ sys.stdout.write("read: {}\n".format(s.read(5)))
s.close()
diff --git a/test/run_all_tests.py b/test/run_all_tests.py
index 355cd44..e0797e7 100644
--- a/test/run_all_tests.py
+++ b/test/run_all_tests.py
@@ -15,10 +15,10 @@ import sys
import os
# inject local copy to avoid testing the installed version instead of the one in the repo
-sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import serial # noqa
-print("Patching sys.path to test local version. Testing Version: %s" % (serial.VERSION,))
+print("Patching sys.path to test local version. Testing Version: {}".format(serial.VERSION))
PORT = 'loop://'
if len(sys.argv) > 1:
@@ -34,11 +34,11 @@ for modulename in [
try:
module = __import__(modulename)
except ImportError:
- print("skipping %s" % (modulename,))
+ print("skipping {}".format(modulename))
else:
module.PORT = PORT
testsuite = unittest.findTestCases(module)
- print("found %s tests in %r" % (testsuite.countTestCases(), modulename))
+ print("found {} tests in {!r}".format(testsuite.countTestCases(), modulename))
mainsuite.addTest(testsuite)
verbosity = 1
diff --git a/test/test.py b/test/test.py
index 8b38c8c..db03907 100644
--- a/test/test.py
+++ b/test/test.py
@@ -66,7 +66,7 @@ class Test4_Nonblocking(unittest.TestCase):
self.s.write(block)
# there might be a small delay until the character is ready (especially on win32)
time.sleep(0.05)
- self.assertEqual(self.s.in_waiting, length, "expected exactly %d character for inWainting()" % length)
+ self.assertEqual(self.s.in_waiting, length, "expected exactly {} character for inWainting()".format(length))
self.assertEqual(self.s.read(length), block) #, "expected a %r which was written before" % block)
self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read")
@@ -131,7 +131,7 @@ class Test1_Forever(unittest.TestCase):
a character is sent after some time to terminate the test (SendEvent)."""
c = self.s.read(1)
if not (self.event.isSet() and c == b'E'):
- self.fail("expected marker (evt=%r, c=%r)" % (self.event.isSet(), c))
+ self.fail("expected marker (evt={!r}, c={!r})".format(self.event.isSet(), c))
class Test2_Forever(unittest.TestCase):
@@ -220,14 +220,14 @@ class Test_MoreTimeouts(unittest.TestCase):
t1 = time.time()
self.assertRaises(serial.SerialTimeoutException, self.s.write, b"timeout please" * 200)
t2 = time.time()
- self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval (%s)" % (t2 - t1))
+ self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval ({})".format(t2 - t1))
if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
diff --git a/test/test_advanced.py b/test/test_advanced.py
index 19559b2..527cc47 100644
--- a/test/test_advanced.py
+++ b/test/test_advanced.py
@@ -23,7 +23,7 @@ import unittest
import serial
# on which port should the tests be performed:
-PORT = 0
+PORT = 'loop://'
class Test_ChangeAttributes(unittest.TestCase):
@@ -38,41 +38,19 @@ class Test_ChangeAttributes(unittest.TestCase):
def test_PortSetting(self):
self.s.port = PORT
- # portstr has to be set
- if isinstance(PORT, str):
- self.assertEqual(self.s.portstr.lower(), PORT.lower())
- else:
- self.assertEqual(self.s.portstr, serial.device(PORT))
+ self.assertEqual(self.s.portstr.lower(), PORT.lower())
# test internals
self.assertEqual(self.s._port, PORT)
# test on the fly change
self.s.open()
self.assertTrue(self.s.isOpen())
- #~ try:
- #~ self.s.port = 0
- #~ except serial.SerialException: # port not available on system
- #~ pass # can't test on this machine...
- #~ else:
- #~ self.failUnless(self.s.isOpen())
- #~ self.failUnlessEqual(self.s.port, 0)
- #~ self.failUnlessEqual(self.s.portstr, serial.device(0))
- #~ try:
- #~ self.s.port = 1
- #~ except serial.SerialException: # port not available on system
- #~ pass # can't test on this machine...
- #~ else:
- #~ self.failUnless(self.s.isOpen())
- #~ self.failUnlessEqual(self.s.port, 1)
- #~ self.failUnlessEqual(self.s.portstr, serial.device(1))
def test_DoubleOpen(self):
- self.s.port = PORT
self.s.open()
# calling open for a second time is an error
self.assertRaises(serial.SerialException, self.s.open)
def test_BaudrateSetting(self):
- self.s.port = PORT
self.s.open()
for baudrate in (300, 9600, 19200, 115200):
self.s.baudrate = baudrate
@@ -88,7 +66,6 @@ class Test_ChangeAttributes(unittest.TestCase):
# therefore the test can not choose a value that fails on any system.
def disabled_test_BaudrateSetting2(self):
# test illegal values, depending on machine/port some of these may be valid...
- self.s.port = PORT
self.s.open()
for illegal_value in (500000, 576000, 921600, 92160):
self.assertRaises(ValueError, setattr, self.s, 'baudrate', illegal_value)
@@ -164,7 +141,6 @@ class Test_ChangeAttributes(unittest.TestCase):
self.assertRaises(serial.SerialException, self.s.open)
def test_PortOpenClose(self):
- self.s.port = PORT
for i in range(3):
# open the port and check flag
self.assertTrue(not self.s.isOpen())
@@ -179,7 +155,7 @@ if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
diff --git a/test/test_asyncio.py b/test/test_asyncio.py
index 99548b3..5df8ef2 100644
--- a/test/test_asyncio.py
+++ b/test/test_asyncio.py
@@ -76,7 +76,7 @@ if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
diff --git a/test/test_cancel.py b/test/test_cancel.py
index 210891b..daab1ce 100644
--- a/test/test_cancel.py
+++ b/test/test_cancel.py
@@ -45,7 +45,7 @@ class TestCancelRead(unittest.TestCase):
self.s.read(1000)
t2 = time.time()
self.assertEqual(self.cancel_called, 1)
- self.assertTrue(0.5 < (t2 - t1) < 2, 'Function did not return in time: {}'.format(t2 - t1))
+ self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1))
#~ self.assertTrue(not self.s.isOpen())
#~ self.assertRaises(serial.SerialException, self.s.open)
@@ -89,7 +89,7 @@ class TestCancelWrite(unittest.TestCase):
self.s.write(DATA)
t2 = time.time()
self.assertEqual(self.cancel_called, 1)
- self.assertTrue(0.5 < (t2 - t1) < 2, 'Function did not return in time: {}'.format(t2 - t1))
+ self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1))
#~ self.assertTrue(not self.s.isOpen())
#~ self.assertRaises(serial.SerialException, self.s.open)
@@ -103,7 +103,7 @@ if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
diff --git a/test/test_high_load.py b/test/test_high_load.py
index 48ec9f3..b0bd773 100644
--- a/test/test_high_load.py
+++ b/test/test_high_load.py
@@ -25,7 +25,7 @@ import sys
import serial
# on which port should the tests be performed:
-PORT = 0
+PORT = 'loop://'
BAUDRATE = 115200
#~ BAUDRATE=9600
@@ -61,7 +61,7 @@ class TestHighLoad(unittest.TestCase):
for i in range(self.N):
self.s.write(q)
read = self.s.read(len(q) * self.N)
- self.assertEqual(read, q * self.N, "expected what was written before. got %d bytes, expected %d" % (len(read), self.N * len(q)))
+ self.assertEqual(read, q * self.N, "expected what was written before. got {} bytes, expected {}".format(len(read), self.N * len(q)))
self.assertEqual(self.s.inWaiting(), 0) # "expected empty buffer after all sent chars are read")
@@ -70,7 +70,7 @@ if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
diff --git a/test/test_iolib.py b/test/test_iolib.py
index 71c7db6..84e3fa2 100644
--- a/test/test_iolib.py
+++ b/test/test_iolib.py
@@ -29,17 +29,8 @@ import sys
import unittest
import serial
-# trick to make that this test run under 2.6 and 3.x without modification.
-# problem is, io library on 2.6 does NOT accept type 'str' and 3.x doesn't
-# like u'nicode' strings with the prefix and it is not providing an unicode
-# function ('str' is now what 'unicode' used to be)
-if sys.version_info >= (3, 0):
- def unicode(x):
- return x
-
-
# on which port should the tests be performed:
-PORT = 0
+PORT = 'loop://'
class Test_SerialAndIO(unittest.TestCase):
@@ -64,7 +55,7 @@ if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
diff --git a/test/test_readline.py b/test/test_readline.py
index ac0c813..34b807b 100644
--- a/test/test_readline.py
+++ b/test/test_readline.py
@@ -28,7 +28,7 @@ import serial
#~ print serial.VERSION
# on which port should the tests be performed:
-PORT = 0
+PORT = 'loop://'
if sys.version_info >= (3, 0):
def data(string):
@@ -98,7 +98,7 @@ if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
diff --git a/test/test_rs485.py b/test/test_rs485.py
index 1d7ed09..e918f67 100644
--- a/test/test_rs485.py
+++ b/test/test_rs485.py
@@ -13,7 +13,7 @@ import serial
import serial.rs485
# on which port should the tests be performed:
-PORT = 0
+PORT = 'loop://'
class Test_RS485_settings(unittest.TestCase):
@@ -43,6 +43,8 @@ class Test_RS485_class(unittest.TestCase):
"""Test RS485 class"""
def setUp(self):
+ if not isinstance(serial.serial_for_url(PORT), serial.Serial):
+ raise unittest.SkipTest("RS485 test only compatible with real serial port")
self.s = serial.rs485.RS485(PORT, timeout=1)
def tearDown(self):
@@ -59,7 +61,7 @@ if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
diff --git a/test/test_settings_dict.py b/test/test_settings_dict.py
index 12fd4c3..86ee4b2 100644
--- a/test/test_settings_dict.py
+++ b/test/test_settings_dict.py
@@ -15,7 +15,7 @@ import unittest
import serial
# on which port should the tests be performed:
-PORT = 0
+PORT = 'loop://'
SETTINGS = ('baudrate', 'bytesize', 'parity', 'stopbits', 'xonxoff',
@@ -74,7 +74,7 @@ if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
diff --git a/test/test_threaded.py b/test/test_threaded.py
new file mode 100644
index 0000000..40f45ad
--- /dev/null
+++ b/test/test_threaded.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+#
+# This file is part of pySerial - Cross platform serial port support for Python
+# (C) 2016 Chris Liechti <cliechti@gmx.net>
+#
+# SPDX-License-Identifier: BSD-3-Clause
+"""\
+Test serial.threaded related functionality.
+"""
+
+import os
+import unittest
+import serial
+import serial.threaded
+import time
+
+
+# on which port should the tests be performed:
+PORT = 'loop://'
+
+class Test_threaded(unittest.TestCase):
+ """Test serial.threaded related functionality"""
+
+ def test_line_reader(self):
+ """simple test of line reader class"""
+
+ class TestLines(serial.threaded.LineReader):
+ def __init__(self):
+ super(TestLines, self).__init__()
+ self.received_lines = []
+
+ def handle_line(self, data):
+ self.received_lines.append(data)
+
+ ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
+ with serial.threaded.ReaderThread(ser, TestLines) as protocol:
+ protocol.write_line('hello')
+ protocol.write_line('world')
+ time.sleep(1)
+ self.assertEqual(protocol.received_lines, ['hello', 'world'])
+
+ def test_framed_packet(self):
+ """simple test of line reader class"""
+
+ class TestFramedPacket(serial.threaded.FramedPacket):
+ def __init__(self):
+ super(TestFramedPacket, self).__init__()
+ self.received_packets = []
+
+ def handle_packet(self, packet):
+ self.received_packets.append(packet)
+
+ def send_packet(self, packet):
+ self.transport.write(self.START)
+ self.transport.write(packet)
+ self.transport.write(self.STOP)
+
+ ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
+ with serial.threaded.ReaderThread(ser, TestFramedPacket) as protocol:
+ protocol.send_packet(b'1')
+ protocol.send_packet(b'2')
+ protocol.send_packet(b'3')
+ time.sleep(1)
+ self.assertEqual(protocol.received_packets, [b'1', b'2', b'3'])
+
+
+if __name__ == '__main__':
+ import sys
+ sys.stdout.write(__doc__)
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ sys.stdout.write("Testing port: {!r}\n".format(PORT))
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/test/test_timeout_class.py b/test/test_timeout_class.py
new file mode 100644
index 0000000..37c38b1
--- /dev/null
+++ b/test/test_timeout_class.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+#
+# This file is part of pySerial - Cross platform serial port support for Python
+# (C) 2016 Chris Liechti <cliechti@gmx.net>
+#
+# SPDX-License-Identifier: BSD-3-Clause
+"""
+Test Timeout helper class.
+"""
+import sys
+import unittest
+import time
+from serial import serialutil
+
+
+class TestTimeoutClass(unittest.TestCase):
+ """Test the Timeout class"""
+
+ def test_simple_timeout(self):
+ """Test simple timeout"""
+ t = serialutil.Timeout(2)
+ self.assertFalse(t.expired())
+ self.assertTrue(t.time_left() > 0)
+ time.sleep(2.1)
+ self.assertTrue(t.expired())
+ self.assertEqual(t.time_left(), 0)
+
+ def test_non_blocking(self):
+ """Test nonblocking case (0)"""
+ t = serialutil.Timeout(0)
+ self.assertTrue(t.is_non_blocking)
+ self.assertFalse(t.is_infinite)
+ self.assertTrue(t.expired())
+
+ def test_blocking(self):
+ """Test no timeout (None)"""
+ t = serialutil.Timeout(None)
+ self.assertFalse(t.is_non_blocking)
+ self.assertTrue(t.is_infinite)
+ #~ self.assertFalse(t.expired())
+
+ def test_changing_clock(self):
+ """Test recovery from chaning clock"""
+ class T(serialutil.Timeout):
+ def TIME(self):
+ return test_time
+ test_time = 1000
+ t = T(10)
+ self.assertEqual(t.target_time, 1010)
+ self.assertFalse(t.expired())
+ self.assertTrue(t.time_left() > 0)
+ test_time = 100 # clock jumps way back
+ self.assertTrue(t.time_left() > 0)
+ self.assertTrue(t.time_left() <= 10)
+ self.assertEqual(t.target_time, 110)
+ test_time = 10000 # jump way forward
+ self.assertEqual(t.time_left(), 0) # if will expire immediately
+
+
+if __name__ == '__main__':
+ sys.stdout.write(__doc__)
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/test/test_util.py b/test/test_util.py
new file mode 100644
index 0000000..5bf8e60
--- /dev/null
+++ b/test/test_util.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+#
+# This file is part of pySerial - Cross platform serial port support for Python
+# (C) 2016 Chris Liechti <cliechti@gmx.net>
+#
+# SPDX-License-Identifier: BSD-3-Clause
+"""\
+Tests for utility functions of serualutil.
+"""
+
+import os
+import unittest
+import serial
+
+
+class Test_util(unittest.TestCase):
+ """Test serial utility functions"""
+
+ def test_to_bytes(self):
+ self.assertEqual(serial.to_bytes([1, 2, 3]), b'\x01\x02\x03')
+ self.assertEqual(serial.to_bytes(b'\x01\x02\x03'), b'\x01\x02\x03')
+ self.assertEqual(serial.to_bytes(bytearray([1,2,3])), b'\x01\x02\x03')
+ # unicode is not supported test. use decode() instead of u'' syntax to be
+ # compatible to Python 3.x < 3.4
+ self.assertRaises(TypeError, serial.to_bytes, b'hello'.decode('utf-8'))
+
+ def test_iterbytes(self):
+ self.assertEqual(list(serial.iterbytes(b'\x01\x02\x03')), [b'\x01', b'\x02', b'\x03'])
+
+
+if __name__ == '__main__':
+ import sys
+ sys.stdout.write(__doc__)
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()