summaryrefslogtreecommitdiff
path: root/lib/py
diff options
context:
space:
mode:
authorYiyang Zhou <yiyangzhou123@ucla.edu>2021-08-04 21:55:04 +0800
committerJens Geyer <Jens-G@users.noreply.github.com>2022-04-23 10:30:46 +0200
commit88a45ac77518eafb57db08938ecdf38c5fcf7a31 (patch)
tree8ef05402b7c300a3c4b1c3a2240489cdcc010d9b /lib/py
parentb664cfe2533e4bbf00fd5e7e0211bf7161ee2a04 (diff)
downloadthrift-88a45ac77518eafb57db08938ecdf38c5fcf7a31.tar.gz
THRIFT-5449: Use poll instead of select in Python TNonblockingServer if available
Client: Python
Diffstat (limited to 'lib/py')
-rw-r--r--lib/py/src/server/TNonblockingServer.py46
1 files changed, 44 insertions, 2 deletions
diff --git a/lib/py/src/server/TNonblockingServer.py b/lib/py/src/server/TNonblockingServer.py
index ac0649651..fdf6779ad 100644
--- a/lib/py/src/server/TNonblockingServer.py
+++ b/lib/py/src/server/TNonblockingServer.py
@@ -253,6 +253,7 @@ class TNonblockingServer(object):
self._read, self._write = socket.socketpair()
self.prepared = False
self._stop = False
+ self.poll = select.poll() if hasattr(select, 'poll') else None
def setNumThreads(self, num):
"""Set the number of worker threads that should be created."""
@@ -318,13 +319,53 @@ class TNonblockingServer(object):
else:
return select.select(readable, writable, readable) + (True,)
+ def _poll_select(self):
+ """Does poll on open connections, if available."""
+ remaining = []
+
+ self.poll.register(self.socket.handle.fileno(), select.POLLIN | select.POLLRDNORM)
+ self.poll.register(self._read.fileno(), select.POLLIN | select.POLLRDNORM)
+
+ for i, connection in list(self.clients.items()):
+ if connection.is_readable():
+ self.poll.register(connection.fileno(), select.POLLIN | select.POLLRDNORM | select.POLLERR | select.POLLHUP | select.POLLNVAL)
+ if connection.remaining or connection.received:
+ remaining.append(connection.fileno())
+ if connection.is_writeable():
+ self.poll.register(connection.fileno(), select.POLLOUT | select.POLLWRNORM)
+ if connection.is_closed():
+ try:
+ self.poll.unregister(i)
+ except KeyError:
+ logger.debug("KeyError in unregistering connections...")
+ del self.clients[i]
+ if remaining:
+ return remaining, [], [], False
+
+ rlist = []
+ wlist = []
+ xlist = []
+ pollres = self.poll.poll()
+ for fd, event in pollres:
+ if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
+ xlist.append(fd)
+ elif event & (select.POLLOUT | select.POLLWRNORM):
+ wlist.append(fd)
+ elif event & (select.POLLIN | select.POLLRDNORM):
+ rlist.append(fd)
+ else: # should be impossible
+ logger.debug("reached an impossible state in _poll_select")
+ xlist.append(fd)
+
+ return rlist, wlist, xlist, True
+
def handle(self):
"""Handle requests.
WARNING! You must call prepare() BEFORE calling handle()
"""
assert self.prepared, "You have to call prepare before handle"
- rset, wset, xset, selected = self._select()
+ rset, wset, xset, selected = self._select() if not self.poll else self._poll_select()
for readable in rset:
if readable == self._read.fileno():
# don't care i just need to clean readable flag
@@ -343,6 +384,8 @@ class TNonblockingServer(object):
connection.read()
if connection.received:
connection.status = WAIT_PROCESS
+ if self.poll:
+ self.poll.unregister(connection.fileno())
msg = connection.received.popleft()
itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset)
otransport = TTransport.TMemoryBuffer()
@@ -354,7 +397,6 @@ class TNonblockingServer(object):
self.clients[writeable].write()
for oob in xset:
self.clients[oob].close()
- del self.clients[oob]
def close(self):
"""Closes the server."""