# mainloop/sockbuf.py -- a buffering, non-blocking socket I/O state machine
#
# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
import logging
'''A buffering, non-blocking I/O state machine for sockets.
The state machine is given an open socket. It reads from the socket,
and writes to it, when it can do so without blocking. A maximum size
for the read buffer can be set: the state machine will stop reading
if the buffer becomes full. This avoids the problem of an excessively
large buffer.
The state machine generates events to indicate that the buffer contains
data or that the end of the file for reading has been reached. An event
is also generated if there is an error while doing I/O with the socket.
* SocketError: an error has occurred
* SocketBufferNewData: socket buffer has received new data; the data
is available as the ``data`` attribute
* SocketBufferEof: socket buffer has reached EOF for reading, but
still writes anything in the write buffer (or anything that gets added
to the write buffer)
* SocketBufferClosed: socket is now closed
The state machine starts shutting down when ``close`` method is called,
but continues to operate in write-only mode until the write buffer has
been emptied.
'''
from socketsrc import (SocketError, SocketReadable, SocketWriteable,
SocketEventSource)
from sm import StateMachine
from stringbuffer import StringBuffer
class SocketBufferNewData(object):
'''Socket buffer has received new data.'''
def __init__(self, data):
self.data = data
class SocketBufferEof(object):
'''Socket buffer has reached end of file when reading.
Note that the socket buffer may still be available for writing.
However, no more new data will be read.
'''
class SocketBufferClosed(object):
'''Socket buffer has closed its socket.'''
class _Close(object): pass
class _WriteBufferIsEmpty(object): pass
class _WriteBufferNotEmpty(object): pass
class SocketBuffer(StateMachine):
def __init__(self, sock, max_buffer):
StateMachine.__init__(self, 'reading')
self._sock = sock
self._max_buffer = max_buffer
def __repr__(self):
return '' % (
id(self), self._sock, self._max_buffer)
def setup(self):
src = self._src = SocketEventSource(self._sock)
src.stop_writing() # We'll start writing when we need to.
self.mainloop.add_event_source(src)
self._wbuf = StringBuffer()
spec = [
# state, source, event_class, new_state, callback
('reading', src, SocketReadable, 'reading', self._fill),
('reading', self, _WriteBufferNotEmpty, 'rw',
self._start_writing),
('reading', self, SocketBufferEof, 'idle', None),
('reading', self, _Close, None, self._really_close),
('rw', src, SocketReadable, 'rw', self._fill),
('rw', src, SocketWriteable, 'rw', self._flush),
('rw', self, _WriteBufferIsEmpty, 'reading', self._stop_writing),
('rw', self, SocketBufferEof, 'w', None),
('rw', self, _Close, 'wc', None),
('idle', self, _WriteBufferNotEmpty, 'w', self._start_writing),
('idle', self, _Close, None, self._really_close),
('w', src, SocketWriteable, 'w', self._flush),
('w', self, _WriteBufferIsEmpty, 'idle', self._stop_writing),
('wc', src, SocketWriteable, 'wc', self._flush),
('wc', self, _WriteBufferIsEmpty, None, self._really_close),
]
self.add_transitions(spec)
def write(self, data):
'''Put data into write queue.'''
was_empty = len(self._wbuf) == 0
self._wbuf.add(data)
if was_empty and len(self._wbuf) > 0:
self._start_writing(None, None)
self.mainloop.queue_event(self, _WriteBufferNotEmpty())
def close(self):
'''Tell state machine to terminate.'''
self.mainloop.queue_event(self, _Close())
def _report_error(self, event_source, event):
logging.error(str(event))
def _fill(self, event_source, event):
try:
data = event.sock.read(self._max_buffer)
except (IOError, OSError) as e:
logging.debug(
'%s: _fill(): Exception %s from sock.read()', self, e)
return [SocketError(event.sock, e)]
if data:
self.mainloop.queue_event(self, SocketBufferNewData(data))
else:
event_source.stop_reading()
self.mainloop.queue_event(self, SocketBufferEof())
def _really_close(self, event_source, event):
self._src.close()
self.mainloop.queue_event(self, SocketBufferClosed())
def _flush(self, event_source, event):
max_write = 1024**2
data = self._wbuf.read(max_write)
try:
n = event.sock.write(data)
except (IOError, OSError) as e:
logging.debug(
'%s: _flush(): Exception %s from sock.write()', self, e)
return [SocketError(event.sock, e)]
self._wbuf.remove(n)
if len(self._wbuf) == 0:
self.mainloop.queue_event(self, _WriteBufferIsEmpty())
def _start_writing(self, event_source, event):
self._src.start_writing()
def _stop_writing(self, event_source, event):
self._src.stop_writing()