summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Merritt <sam@swiftstack.com>2014-06-10 14:15:27 -0700
committerSamuel Merritt <sam@swiftstack.com>2014-09-18 16:02:47 -0700
commit7d0e5ebe690bf3cf41ccd970281d532a23284e58 (patch)
tree69d5bc454abb0dc16e4d035f16d1b37b59533d40
parenta81b2d2c744b1f227d066957bdfb09b3526b813c (diff)
downloadswift-7d0e5ebe690bf3cf41ccd970281d532a23284e58.tar.gz
Zero-copy object-server GET responses with splice()
This commit lets the object server use splice() and tee() to move data from disk to the network without ever copying it into user space. Requires Linux. Sorry, FreeBSD folks. You still have the old mechanism, as does anyone who doesn't want to use splice. This requires a relatively recent kernel (2.6.38+) to work, which includes the two most recent Ubuntu LTS releases (Precise and Trusty) as well as RHEL 7. However, it excludes Lucid and RHEL 6. On those systems, setting "splice = on" will result in warnings in the logs but no actual use of splice. Note that this only applies to GET responses without Range headers. It can easily be extended to single-range GET requests, but this commit leaves that for future work. Same goes for PUT requests, or at least non-chunked ones. On some real hardware I had laying around (not a VM), this produced a 37% reduction in CPU usage for GETs made directly to the object server. Measurements were done by looking at /proc/<pid>/stat, specifically the utime and stime fields (user and kernel CPU jiffies, respectively). Note: There is a Python module called "splicetee" available on PyPi, but it's licensed under the GPL, so it cannot easily be added to OpenStack's requirements. That's why this patch uses ctypes instead. Also fixed a long-standing annoyance in FakeLogger: >>> fake_logger.warn('stuff') >>> fake_logger.get_lines_for_level('warn') [] >>> This, of course, is because the correct log level is 'warning'. Now you get a KeyError if you call get_lines_for_level with a bogus log level. Change-Id: Ic6d6b833a5b04ca2019be94b1b90d941929d21c8
-rw-r--r--etc/object-server.conf-sample7
-rw-r--r--swift/common/utils.py163
-rw-r--r--swift/obj/diskfile.py174
-rw-r--r--swift/obj/server.py67
-rw-r--r--test/unit/__init__.py12
-rw-r--r--test/unit/common/test_container_sync_realms.py16
-rw-r--r--test/unit/common/test_utils.py3
-rw-r--r--test/unit/obj/test_diskfile.py57
-rwxr-xr-xtest/unit/obj/test_server.py132
9 files changed, 610 insertions, 21 deletions
diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample
index 678f2cfae..4f6e133f1 100644
--- a/etc/object-server.conf-sample
+++ b/etc/object-server.conf-sample
@@ -127,6 +127,13 @@ use = egg:swift#object
# an abort to occur.
# replication_failure_threshold = 100
# replication_failure_ratio = 1.0
+#
+# Use splice() for zero-copy object GETs. This requires Linux kernel
+# version 3.0 or greater. If you set "splice = yes" but the kernel
+# does not support it, error messages will appear in the object server
+# logs at startup, but your object servers should continue to function.
+#
+# splice = no
[filter:healthcheck]
use = egg:swift#healthcheck
diff --git a/swift/common/utils.py b/swift/common/utils.py
index e37dc34c6..6681cf712 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -84,6 +84,11 @@ SysLogHandler.priority_map['NOTICE'] = 'notice'
# These are lazily pulled from libc elsewhere
_sys_fallocate = None
_posix_fadvise = None
+_libc_socket = None
+_libc_bind = None
+_libc_accept = None
+_libc_splice = None
+_libc_tee = None
# If set to non-zero, fallocate routines will fail based on free space
# available being at or below this amount, in bytes.
@@ -97,6 +102,13 @@ HASH_PATH_PREFIX = ''
SWIFT_CONF_FILE = '/etc/swift/swift.conf'
+# These constants are Linux-specific, and Python doesn't seem to know
+# about them. We ask anyway just in case that ever gets fixed.
+#
+# The values were copied from the Linux 3.0 kernel headers.
+AF_ALG = getattr(socket, 'AF_ALG', 38)
+F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031)
+
class InvalidHashPathConfigError(ValueError):
@@ -292,16 +304,22 @@ def validate_configuration():
sys.exit("Error: %s" % e)
-def load_libc_function(func_name, log_error=True):
+def load_libc_function(func_name, log_error=True,
+ fail_if_missing=False):
"""
Attempt to find the function in libc, otherwise return a no-op func.
:param func_name: name of the function to pull from libc.
+ :param log_error: log an error when a function can't be found
+ :param fail_if_missing: raise an exception when a function can't be found.
+ Default behavior is to return a no-op function.
"""
try:
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
return getattr(libc, func_name)
except AttributeError:
+ if fail_if_missing:
+ raise
if log_error:
logging.warn(_("Unable to locate %s in libc. Leaving as a "
"no-op."), func_name)
@@ -3112,3 +3130,146 @@ def parse_content_disposition(header):
attrs = attrs[len(m.group(0)):]
attributes[m.group(1)] = m.group(2).strip('"')
return header, attributes
+
+
+class sockaddr_alg(ctypes.Structure):
+ _fields_ = [("salg_family", ctypes.c_ushort),
+ ("salg_type", ctypes.c_ubyte * 14),
+ ("salg_feat", ctypes.c_uint),
+ ("salg_mask", ctypes.c_uint),
+ ("salg_name", ctypes.c_ubyte * 64)]
+
+
+_bound_md5_sockfd = None
+
+
+def get_md5_socket():
+ """
+ Get an MD5 socket file descriptor. One can MD5 data with it by writing it
+ to the socket with os.write, then os.read the 16 bytes of the checksum out
+ later.
+
+ NOTE: It is the caller's responsibility to ensure that os.close() is
+ called on the returned file descriptor. This is a bare file descriptor,
+ not a Python object. It doesn't close itself.
+ """
+
+ # Linux's AF_ALG sockets work like this:
+ #
+ # First, initialize a socket with socket() and bind(). This tells the
+ # socket what algorithm to use, as well as setting up any necessary bits
+ # like crypto keys. Of course, MD5 doesn't need any keys, so it's just the
+ # algorithm name.
+ #
+ # Second, to hash some data, get a second socket by calling accept() on
+ # the first socket. Write data to the socket, then when finished, read the
+ # checksum from the socket and close it. This lets you checksum multiple
+ # things without repeating all the setup code each time.
+ #
+ # Since we only need to bind() one socket, we do that here and save it for
+ # future re-use. That way, we only use one file descriptor to get an MD5
+ # socket instead of two, and we also get to save some syscalls.
+
+ global _bound_md5_sockfd
+ global _libc_socket
+ global _libc_bind
+ global _libc_accept
+
+ if _libc_accept is None:
+ _libc_accept = load_libc_function('accept', fail_if_missing=True)
+ if _libc_socket is None:
+ _libc_socket = load_libc_function('socket', fail_if_missing=True)
+ if _libc_bind is None:
+ _libc_bind = load_libc_function('bind', fail_if_missing=True)
+
+ # Do this at first call rather than at import time so that we don't use a
+ # file descriptor on systems that aren't using any MD5 sockets.
+ if _bound_md5_sockfd is None:
+ sockaddr_setup = sockaddr_alg(
+ AF_ALG,
+ (ord('h'), ord('a'), ord('s'), ord('h'), 0),
+ 0, 0,
+ (ord('m'), ord('d'), ord('5'), 0))
+ hash_sockfd = _libc_socket(ctypes.c_int(AF_ALG),
+ ctypes.c_int(socket.SOCK_SEQPACKET),
+ ctypes.c_int(0))
+ if hash_sockfd < 0:
+ raise IOError(ctypes.get_errno(),
+ "Failed to initialize MD5 socket")
+
+ bind_result = _libc_bind(ctypes.c_int(hash_sockfd),
+ ctypes.pointer(sockaddr_setup),
+ ctypes.c_int(ctypes.sizeof(sockaddr_alg)))
+ if bind_result < 0:
+ os.close(hash_sockfd)
+ raise IOError(ctypes.get_errno(), "Failed to bind MD5 socket")
+
+ _bound_md5_sockfd = hash_sockfd
+
+ md5_sockfd = _libc_accept(ctypes.c_int(_bound_md5_sockfd), None, 0)
+ if md5_sockfd < 0:
+ raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket")
+
+ return md5_sockfd
+
+
+# Flags for splice() and tee()
+SPLICE_F_MOVE = 1
+SPLICE_F_NONBLOCK = 2
+SPLICE_F_MORE = 4
+SPLICE_F_GIFT = 8
+
+
+def splice(fd_in, off_in, fd_out, off_out, length, flags):
+ """
+ Calls splice - a Linux-specific syscall for zero-copy data movement.
+
+ On success, returns the number of bytes moved.
+
+ On failure where errno is EWOULDBLOCK, returns None.
+
+ On all other failures, raises IOError.
+ """
+ global _libc_splice
+ if _libc_splice is None:
+ _libc_splice = load_libc_function('splice', fail_if_missing=True)
+
+ ret = _libc_splice(ctypes.c_int(fd_in), ctypes.c_long(off_in),
+ ctypes.c_int(fd_out), ctypes.c_long(off_out),
+ ctypes.c_int(length), ctypes.c_int(flags))
+ if ret < 0:
+ err = ctypes.get_errno()
+ if err == errno.EWOULDBLOCK:
+ return None
+ else:
+ raise IOError(err, "splice() failed: %s" % os.strerror(err))
+ return ret
+
+
+def tee(fd_in, fd_out, length, flags):
+ """
+ Calls tee - a Linux-specific syscall to let pipes share data.
+
+ On success, returns the number of bytes "copied".
+
+ On failure, raises IOError.
+ """
+ global _libc_tee
+ if _libc_tee is None:
+ _libc_tee = load_libc_function('tee', fail_if_missing=True)
+
+ ret = _libc_tee(ctypes.c_int(fd_in), ctypes.c_int(fd_out),
+ ctypes.c_int(length), ctypes.c_int(flags))
+ if ret < 0:
+ err = ctypes.get_errno()
+ raise IOError(err, "tee() failed: %s" % os.strerror(err))
+ return ret
+
+
+def system_has_splice():
+ global _libc_splice
+ try:
+ _libc_splice = load_libc_function('splice', fail_if_missing=True)
+ return True
+ except AttributeError:
+ return False
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index 896225a97..62b38f8a2 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -32,6 +32,7 @@ are also not considered part of the backend API.
import cPickle as pickle
import errno
+import fcntl
import os
import time
import uuid
@@ -46,6 +47,7 @@ from collections import defaultdict
from xattr import getxattr, setxattr
from eventlet import Timeout
+from eventlet.hubs import trampoline
from swift import gettext_ as _
from swift.common.constraints import check_mount
@@ -53,7 +55,9 @@ from swift.common.request_helpers import is_sys_meta
from swift.common.utils import mkdirs, Timestamp, \
storage_directory, hash_path, renamer, fallocate, fsync, \
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
- config_true_value, listdir, split_path, ismount, remove_file
+ config_true_value, listdir, split_path, ismount, remove_file, \
+ get_md5_socket, system_has_splice, splice, tee, SPLICE_F_MORE, \
+ F_SETPIPE_SZ
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir, \
@@ -62,10 +66,12 @@ from swift.common.swob import multi_range_iterator
from swift.common.storage_policy import get_policy_string, POLICIES
from functools import partial
+
PICKLE_PROTOCOL = 2
ONE_WEEK = 604800
HASH_FILE = 'hashes.pkl'
METADATA_KEY = 'user.swift.metadata'
+DROP_CACHE_WINDOW = 1024 * 1024
# These are system-set metadata keys that cannot be changed with a POST.
# They should be lowercase.
DATAFILE_SYSTEM_META = set('content-length content-type deleted etag'.split())
@@ -75,6 +81,7 @@ TMP_BASE = 'tmp'
get_data_dir = partial(get_policy_string, DATADIR_BASE)
get_async_dir = partial(get_policy_string, ASYNCDIR_BASE)
get_tmp_dir = partial(get_policy_string, TMP_BASE)
+MD5_OF_EMPTY_STRING = 'd41d8cd98f00b204e9800998ecf8427e'
def read_metadata(fd):
@@ -498,6 +505,37 @@ class DiskFileManager(object):
self.threadpools = defaultdict(
lambda: ThreadPool(nthreads=threads_per_disk))
+ self.use_splice = False
+ self.pipe_size = None
+
+ splice_available = system_has_splice()
+
+ conf_wants_splice = config_true_value(conf.get('splice', 'no'))
+ # If the operator wants zero-copy with splice() but we don't have the
+ # requisite kernel support, complain so they can go fix it.
+ if conf_wants_splice and not splice_available:
+ self.logger.warn(
+ "Use of splice() requested (config says \"splice = %s\"), "
+ "but the system does not support it. "
+ "splice() will not be used." % conf.get('splice'))
+ elif conf_wants_splice and splice_available:
+ try:
+ sockfd = get_md5_socket()
+ os.close(sockfd)
+ except IOError as err:
+ # AF_ALG socket support was introduced in kernel 2.6.38; on
+ # systems with older kernels (or custom-built kernels lacking
+ # AF_ALG support), we can't use zero-copy.
+ if err.errno != errno.EAFNOSUPPORT:
+ raise
+ self.logger.warn("MD5 sockets not supported. "
+ "splice() will not be used.")
+ else:
+ self.use_splice = True
+ with open('/proc/sys/fs/pipe-max-size') as f:
+ max_pipe_size = int(f.read())
+ self.pipe_size = min(max_pipe_size, self.disk_chunk_size)
+
def construct_dev_path(self, device):
"""
Construct the path to a device without checking if it is mounted.
@@ -564,7 +602,9 @@ class DiskFileManager(object):
raise DiskFileDeviceUnavailable()
return DiskFile(self, dev_path, self.threadpools[device],
partition, account, container, obj,
- policy_idx=policy_idx, **kwargs)
+ policy_idx=policy_idx,
+ use_splice=self.use_splice, pipe_size=self.pipe_size,
+ **kwargs)
def object_audit_location_generator(self, device_dirs=None):
return object_audit_location_generator(self.devices, self.mount_check,
@@ -830,11 +870,13 @@ class DiskFileReader(object):
:param device_path: on-disk device path, used when quarantining an obj
:param logger: logger caller wants this object to use
:param quarantine_hook: 1-arg callable called w/reason when quarantined
+ :param use_splice: if true, use zero-copy splice() to send data
+ :param pipe_size: size of pipe buffer used in zero-copy operations
:param keep_cache: should resulting reads be kept in the buffer cache
"""
def __init__(self, fp, data_file, obj_size, etag, threadpool,
disk_chunk_size, keep_cache_size, device_path, logger,
- quarantine_hook, keep_cache=False):
+ quarantine_hook, use_splice, pipe_size, keep_cache=False):
# Parameter tracking
self._fp = fp
self._data_file = data_file
@@ -845,6 +887,8 @@ class DiskFileReader(object):
self._device_path = device_path
self._logger = logger
self._quarantine_hook = quarantine_hook
+ self._use_splice = use_splice
+ self._pipe_size = pipe_size
if keep_cache:
# Caller suggests we keep this in cache, only do it if the
# object's size is less than the maximum.
@@ -857,6 +901,7 @@ class DiskFileReader(object):
self._bytes_read = 0
self._started_at_0 = False
self._read_to_eof = False
+ self._md5_of_sent_bytes = None
self._suppress_file_closing = False
self._quarantined_dir = None
@@ -877,7 +922,7 @@ class DiskFileReader(object):
if self._iter_etag:
self._iter_etag.update(chunk)
self._bytes_read += len(chunk)
- if self._bytes_read - dropped_cache > (1024 * 1024):
+ if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW:
self._drop_cache(self._fp.fileno(), dropped_cache,
self._bytes_read - dropped_cache)
dropped_cache = self._bytes_read
@@ -891,6 +936,109 @@ class DiskFileReader(object):
if not self._suppress_file_closing:
self.close()
+ def can_zero_copy_send(self):
+ return self._use_splice
+
+ def zero_copy_send(self, wsockfd):
+ """
+ Does some magic with splice() and tee() to move stuff from disk to
+ network without ever touching userspace.
+
+ :param wsockfd: file descriptor (integer) of the socket out which to
+ send data
+ """
+ # Note: if we ever add support for zero-copy ranged GET responses,
+ # we'll have to make this conditional.
+ self._started_at_0 = True
+
+ rfd = self._fp.fileno()
+ client_rpipe, client_wpipe = os.pipe()
+ hash_rpipe, hash_wpipe = os.pipe()
+ md5_sockfd = get_md5_socket()
+
+ # The actual amount allocated to the pipe may be rounded up to the
+ # nearest multiple of the page size. If we have the memory allocated,
+ # we may as well use it.
+ #
+ # Note: this will raise IOError on failure, so we don't bother
+ # checking the return value.
+ pipe_size = fcntl.fcntl(client_rpipe, F_SETPIPE_SZ, self._pipe_size)
+ fcntl.fcntl(hash_rpipe, F_SETPIPE_SZ, pipe_size)
+
+ dropped_cache = 0
+ self._bytes_read = 0
+ try:
+ while True:
+ # Read data from disk to pipe
+ bytes_in_pipe = self._threadpool.run_in_thread(
+ splice, rfd, 0, client_wpipe, 0, pipe_size, 0)
+ if bytes_in_pipe == 0:
+ self._read_to_eof = True
+ self._drop_cache(rfd, dropped_cache,
+ self._bytes_read - dropped_cache)
+ break
+ self._bytes_read += bytes_in_pipe
+
+ # "Copy" data from pipe A to pipe B (really just some pointer
+ # manipulation in the kernel, not actual copying).
+ bytes_copied = tee(client_rpipe, hash_wpipe, bytes_in_pipe, 0)
+ if bytes_copied != bytes_in_pipe:
+ # We teed data between two pipes of equal size, and the
+ # destination pipe was empty. If, somehow, the destination
+ # pipe was full before all the data was teed, we should
+ # fail here. If we don't raise an exception, then we will
+ # have the incorrect MD5 hash once the object has been
+ # sent out, causing a false-positive quarantine.
+ raise Exception("tee() failed: tried to move %d bytes, "
+ "but only moved %d" %
+ (bytes_in_pipe, bytes_copied))
+ # Take the data and feed it into an in-kernel MD5 socket. The
+ # MD5 socket hashes data that is written to it. Reading from
+ # it yields the MD5 checksum of the written data.
+ #
+ # Note that we don't have to worry about splice() returning
+ # None here (which happens on EWOULDBLOCK); we're splicing
+ # $bytes_in_pipe bytes from a pipe with exactly that many
+ # bytes in it, so read won't block, and we're splicing it into
+ # an MD5 socket, which synchronously hashes any data sent to
+ # it, so writing won't block either.
+ hashed = splice(hash_rpipe, 0, md5_sockfd, 0,
+ bytes_in_pipe, SPLICE_F_MORE)
+ if hashed != bytes_in_pipe:
+ raise Exception("md5 socket didn't take all the data? "
+ "(tried to write %d, but wrote %d)" %
+ (bytes_in_pipe, hashed))
+
+ while bytes_in_pipe > 0:
+ sent = splice(client_rpipe, 0, wsockfd, 0,
+ bytes_in_pipe, 0)
+ if sent is None: # would have blocked
+ trampoline(wsockfd, write=True)
+ else:
+ bytes_in_pipe -= sent
+
+ if self._bytes_read - dropped_cache > DROP_CACHE_WINDOW:
+ self._drop_cache(rfd, dropped_cache,
+ self._bytes_read - dropped_cache)
+ dropped_cache = self._bytes_read
+ finally:
+ # Linux MD5 sockets return '00000000000000000000000000000000' for
+ # the checksum if you didn't write any bytes to them, instead of
+ # returning the correct value.
+ if self._bytes_read > 0:
+ bin_checksum = os.read(md5_sockfd, 16)
+ hex_checksum = ''.join("%02x" % ord(c) for c in bin_checksum)
+ else:
+ hex_checksum = MD5_OF_EMPTY_STRING
+ self._md5_of_sent_bytes = hex_checksum
+
+ os.close(client_rpipe)
+ os.close(client_wpipe)
+ os.close(hash_rpipe)
+ os.close(hash_wpipe)
+ os.close(md5_sockfd)
+ self.close()
+
def app_iter_range(self, start, stop):
"""Returns an iterator over the data file for range (start, stop)"""
if start or start == 0:
@@ -942,15 +1090,18 @@ class DiskFileReader(object):
def _handle_close_quarantine(self):
"""Check if file needs to be quarantined"""
+ if self._iter_etag and not self._md5_of_sent_bytes:
+ self._md5_of_sent_bytes = self._iter_etag.hexdigest()
+
if self._bytes_read != self._obj_size:
self._quarantine(
"Bytes read: %s, does not match metadata: %s" % (
self._bytes_read, self._obj_size))
- elif self._iter_etag and \
- self._etag != self._iter_etag.hexdigest():
+ elif self._md5_of_sent_bytes and \
+ self._etag != self._md5_of_sent_bytes:
self._quarantine(
"ETag %s and file's md5 %s do not match" % (
- self._etag, self._iter_etag.hexdigest()))
+ self._etag, self._md5_of_sent_bytes))
def close(self):
"""
@@ -998,17 +1149,21 @@ class DiskFile(object):
:param obj: object name for the object
:param _datadir: override the full datadir otherwise constructed here
:param policy_idx: used to get the data dir when constructing it here
+ :param use_splice: if true, use zero-copy splice() to send data
+ :param pipe_size: size of pipe buffer used in zero-copy operations
"""
def __init__(self, mgr, device_path, threadpool, partition,
account=None, container=None, obj=None, _datadir=None,
- policy_idx=0):
+ policy_idx=0, use_splice=False, pipe_size=None):
self._mgr = mgr
self._device_path = device_path
self._threadpool = threadpool or ThreadPool(nthreads=0)
self._logger = mgr.logger
self._disk_chunk_size = mgr.disk_chunk_size
self._bytes_per_sync = mgr.bytes_per_sync
+ self._use_splice = use_splice
+ self._pipe_size = pipe_size
if account and container and obj:
self._name = '/' + '/'.join((account, container, obj))
self._account = account
@@ -1377,7 +1532,8 @@ class DiskFile(object):
self._fp, self._data_file, int(self._metadata['Content-Length']),
self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
self._mgr.keep_cache_size, self._device_path, self._logger,
- quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
+ use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
+ pipe_size=self._pipe_size, keep_cache=keep_cache)
# At this point the reader object is now responsible for closing
# the file pointer.
self._fp = None
diff --git a/swift/obj/server.py b/swift/obj/server.py
index 0fa1d7622..fcac395c0 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -25,7 +25,7 @@ import math
from swift import gettext_ as _
from hashlib import md5
-from eventlet import sleep, Timeout
+from eventlet import sleep, wsgi, Timeout
from swift.common.utils import public, get_logger, \
config_true_value, timing_stats, replication, \
@@ -50,6 +50,19 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager
+class EventletPlungerString(str):
+ """
+ Eventlet won't send headers until it's accumulated at least
+ eventlet.wsgi.MINIMUM_CHUNK_SIZE bytes or the app iter is exhausted. If we
+ want to send the response body behind Eventlet's back, perhaps with some
+ zero-copy wizardry, then we have to unclog the plumbing in eventlet.wsgi
+ to force the headers out, so we use an EventletPlungerString to empty out
+ all of Eventlet's buffers.
+ """
+ def __len__(self):
+ return wsgi.MINIMUM_CHUNK_SIZE + 1
+
+
class ObjectController(object):
"""Implements the WSGI application for the Swift Object Server."""
@@ -710,7 +723,57 @@ class ObjectController(object):
slow = self.slow - trans_time
if slow > 0:
sleep(slow)
- return res(env, start_response)
+
+ # To be able to zero-copy send the object, we need a few things.
+ # First, we have to be responding successfully to a GET, or else we're
+ # not sending the object. Second, we have to be able to extract the
+ # socket file descriptor from the WSGI input object. Third, the
+ # diskfile has to support zero-copy send.
+ #
+ # There's a good chance that this could work for 206 responses too,
+ # but the common case is sending the whole object, so we'll start
+ # there.
+ if req.method == 'GET' and res.status_int == 200 and \
+ isinstance(env['wsgi.input'], wsgi.Input):
+ app_iter = getattr(res, 'app_iter', None)
+ checker = getattr(app_iter, 'can_zero_copy_send', None)
+ if checker and checker():
+ # For any kind of zero-copy thing like sendfile or splice, we
+ # need the file descriptor. Eventlet doesn't provide a clean
+ # way of getting that, so we resort to this.
+ wsock = env['wsgi.input'].get_socket()
+ wsockfd = wsock.fileno()
+
+ # Don't call zero_copy_send() until after we force the HTTP
+ # headers out of Eventlet and into the socket.
+ def zero_copy_iter():
+ # If possible, set TCP_CORK so that headers don't
+ # immediately go on the wire, but instead, wait for some
+ # response body to make the TCP frames as large as
+ # possible (and hence as few packets as possible).
+ #
+ # On non-Linux systems, we might consider TCP_NODELAY, but
+ # since the only known zero-copy-capable diskfile uses
+ # Linux-specific syscalls, we'll defer that work until
+ # someone needs it.
+ if hasattr(socket, 'TCP_CORK'):
+ wsock.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_CORK, 1)
+ yield EventletPlungerString()
+ try:
+ app_iter.zero_copy_send(wsockfd)
+ except Exception:
+ self.logger.exception("zero_copy_send() blew up")
+ raise
+ yield ''
+
+ # Get headers ready to go out
+ res(env, start_response)
+ return zero_copy_iter()
+ else:
+ return res(env, start_response)
+ else:
+ return res(env, start_response)
def global_conf_callback(preloaded_app_conf, global_conf):
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 505dfb0cb..b869c9a1e 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -376,7 +376,8 @@ class FakeLogger(logging.Logger):
def _clear(self):
self.log_dict = defaultdict(list)
- self.lines_dict = defaultdict(list)
+ self.lines_dict = {'critical': [], 'error': [], 'info': [],
+ 'warning': [], 'debug': []}
def _store_in(store_name):
def stub_fn(self, *args, **kwargs):
@@ -390,8 +391,17 @@ class FakeLogger(logging.Logger):
return stub_fn
def get_lines_for_level(self, level):
+ if level not in self.lines_dict:
+ raise KeyError(
+ "Invalid log level '%s'; valid levels are %s" %
+ (level,
+ ', '.join("'%s'" % lvl for lvl in sorted(self.lines_dict))))
return self.lines_dict[level]
+ def all_log_lines(self):
+ return dict((level, msgs) for level, msgs in self.lines_dict.items()
+ if len(msgs) > 0)
+
error = _store_and_log_in('error', logging.ERROR)
info = _store_and_log_in('info', logging.INFO)
warning = _store_and_log_in('warning', logging.WARNING)
diff --git a/test/unit/common/test_container_sync_realms.py b/test/unit/common/test_container_sync_realms.py
index cc300e780..1ce8d489b 100644
--- a/test/unit/common/test_container_sync_realms.py
+++ b/test/unit/common/test_container_sync_realms.py
@@ -28,7 +28,7 @@ class TestUtils(unittest.TestCase):
logger = FakeLogger()
csr = ContainerSyncRealms(unique, logger)
self.assertEqual(
- logger.lines_dict,
+ logger.all_log_lines(),
{'debug': [
"Could not load '%s': [Errno 2] No such file or directory: "
"'%s'" % (unique, unique)]})
@@ -45,7 +45,7 @@ class TestUtils(unittest.TestCase):
csr = ContainerSyncRealms(fpath, logger)
try:
self.assertEqual(
- logger.lines_dict,
+ logger.all_log_lines(),
{'error': [
"Could not load '%s': [Errno 13] Permission denied: "
"'%s'" % (fpath, fpath)]})
@@ -61,7 +61,7 @@ class TestUtils(unittest.TestCase):
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
- self.assertEqual(logger.lines_dict, {})
+ self.assertEqual(logger.all_log_lines(), {})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), [])
@@ -73,7 +73,7 @@ class TestUtils(unittest.TestCase):
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
self.assertEqual(
- logger.lines_dict,
+ logger.all_log_lines(),
{'error': [
"Could not load '%s': File contains no section headers.\n"
"file: %s, line: 1\n"
@@ -92,7 +92,7 @@ cluster_dfw1 = http://dfw1.host/v1/
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
- self.assertEqual(logger.lines_dict, {})
+ self.assertEqual(logger.all_log_lines(), {})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), ['US'])
self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
@@ -120,7 +120,7 @@ cluster_lon3 = http://lon3.host/v1/
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
- self.assertEqual(logger.lines_dict, {})
+ self.assertEqual(logger.all_log_lines(), {})
self.assertEqual(csr.mtime_check_interval, 60)
self.assertEqual(sorted(csr.realms()), ['UK', 'US'])
self.assertEqual(csr.key('US'), '9ff3b71c849749dbaec4ccdd3cbab62b')
@@ -144,7 +144,7 @@ cluster_lon3 = http://lon3.host/v1/
logger = FakeLogger()
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
- self.assertEqual(logger.lines_dict, {})
+ self.assertEqual(logger.all_log_lines(), {})
self.assertEqual(csr.mtime_check_interval, 300)
self.assertEqual(csr.realms(), ['US'])
self.assertEqual(csr.key('US'), None)
@@ -163,7 +163,7 @@ mtime_check_interval = invalid
fpath = os.path.join(tempdir, fname)
csr = ContainerSyncRealms(fpath, logger)
self.assertEqual(
- logger.lines_dict,
+ logger.all_log_lines(),
{'error': [
"Error in '%s' with mtime_check_interval: invalid literal "
"for int() with base 10: 'invalid'" % fpath]})
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index 6f91477d4..685bcfe4d 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -1503,6 +1503,9 @@ class TestUtils(unittest.TestCase):
utils.load_libc_function('printf')))
self.assert_(callable(
utils.load_libc_function('some_not_real_function')))
+ self.assertRaises(AttributeError,
+ utils.load_libc_function, 'some_not_real_function',
+ fail_if_missing=True)
def test_readconf(self):
conf = '''[section1]
diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py
index b62dbf85c..281dbe61d 100644
--- a/test/unit/obj/test_diskfile.py
+++ b/test/unit/obj/test_diskfile.py
@@ -36,6 +36,7 @@ from eventlet import tpool
from test.unit import (FakeLogger, mock as unit_mock, temptree,
patch_policies, debug_logger)
+from nose import SkipTest
from swift.obj import diskfile
from swift.common import utils
from swift.common.utils import hash_path, mkdirs, Timestamp
@@ -951,6 +952,18 @@ class TestDiskFileManager(unittest.TestCase):
lock_exc = err
self.assertTrue(lock_exc is None)
+ def test_missing_splice_warning(self):
+ logger = FakeLogger()
+ with mock.patch('swift.obj.diskfile.system_has_splice',
+ lambda: False):
+ self.conf['splice'] = 'yes'
+ mgr = diskfile.DiskFileManager(self.conf, logger)
+
+ warnings = logger.get_lines_for_level('warning')
+ self.assertTrue(len(warnings) > 0)
+ self.assertTrue('splice()' in warnings[-1])
+ self.assertFalse(mgr.use_splice)
+
@patch_policies
class TestDiskFile(unittest.TestCase):
@@ -2183,6 +2196,50 @@ class TestDiskFile(unittest.TestCase):
self.assertEquals(len(dl), 2)
self.assertTrue(exp_name in set(dl))
+ def _system_can_zero_copy(self):
+ if not utils.system_has_splice():
+ return False
+
+ try:
+ utils.get_md5_socket()
+ except IOError:
+ return False
+
+ return True
+
+ def test_zero_copy_cache_dropping(self):
+ if not self._system_can_zero_copy():
+ raise SkipTest("zero-copy support is missing")
+
+ self.conf['splice'] = 'on'
+ self.conf['keep_cache_size'] = 16384
+ self.conf['disk_chunk_size'] = 4096
+ self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger())
+
+ df = self._get_open_disk_file(fsize=16385)
+ reader = df.reader()
+ self.assertTrue(reader.can_zero_copy_send())
+ with mock.patch("swift.obj.diskfile.drop_buffer_cache") as dbc:
+ with mock.patch("swift.obj.diskfile.DROP_CACHE_WINDOW", 4095):
+ with open('/dev/null', 'w') as devnull:
+ reader.zero_copy_send(devnull.fileno())
+ self.assertEqual(len(dbc.mock_calls), 5)
+
+ def test_zero_copy_turns_off_when_md5_sockets_not_supported(self):
+ if not self._system_can_zero_copy():
+ raise SkipTest("zero-copy support is missing")
+
+ self.conf['splice'] = 'on'
+ with mock.patch('swift.obj.diskfile.get_md5_socket') as mock_md5sock:
+ mock_md5sock.side_effect = IOError(
+ errno.EAFNOSUPPORT, "MD5 socket busted")
+ df = self._get_open_disk_file(fsize=128)
+ reader = df.reader()
+ self.assertFalse(reader.can_zero_copy_send())
+
+ log_lines = self.df_mgr.logger.get_lines_for_level('warning')
+ self.assert_('MD5 sockets' in log_lines[-1])
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index e2137484e..85886d363 100755
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -33,6 +33,7 @@ import itertools
import tempfile
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool
+from eventlet.green import httplib
from nose import SkipTest
@@ -4373,5 +4374,136 @@ class TestObjectServer(unittest.TestCase):
resp.close()
+class TestZeroCopy(unittest.TestCase):
+ """Test the object server's zero-copy functionality"""
+
+ def _system_can_zero_copy(self):
+ if not utils.system_has_splice():
+ return False
+
+ try:
+ utils.get_md5_socket()
+ except IOError:
+ return False
+
+ return True
+
+ def setUp(self):
+ if not self._system_can_zero_copy():
+ raise SkipTest("zero-copy support is missing")
+
+ self.testdir = mkdtemp(suffix="obj_server_zero_copy")
+ mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
+
+ conf = {'devices': self.testdir,
+ 'mount_check': 'false',
+ 'splice': 'yes',
+ 'disk_chunk_size': '4096'}
+ self.object_controller = object_server.ObjectController(
+ conf, logger=debug_logger())
+ self.df_mgr = diskfile.DiskFileManager(
+ conf, self.object_controller.logger)
+
+ listener = listen(('localhost', 0))
+ port = listener.getsockname()[1]
+ self.wsgi_greenlet = spawn(
+ wsgi.server, listener, self.object_controller, NullLogger())
+
+ self.http_conn = httplib.HTTPConnection('localhost', port)
+ self.http_conn.connect()
+
+ def tearDown(self):
+ """Tear down for testing swift.object.server.ObjectController"""
+ self.wsgi_greenlet.kill()
+ rmtree(self.testdir)
+
+ def test_GET(self):
+ url_path = '/sda1/2100/a/c/o'
+
+ self.http_conn.request('PUT', url_path, 'obj contents',
+ {'X-Timestamp': '127082564.24709'})
+ response = self.http_conn.getresponse()
+ self.assertEqual(response.status, 201)
+ response.read()
+
+ self.http_conn.request('GET', url_path)
+ response = self.http_conn.getresponse()
+ self.assertEqual(response.status, 200)
+ contents = response.read()
+ self.assertEqual(contents, 'obj contents')
+
+ def test_GET_big(self):
+ # Test with a large-ish object to make sure we handle full socket
+ # buffers correctly.
+ obj_contents = 'A' * 4 * 1024 * 1024 # 4 MiB
+ url_path = '/sda1/2100/a/c/o'
+
+ self.http_conn.request('PUT', url_path, obj_contents,
+ {'X-Timestamp': '1402600322.52126'})
+ response = self.http_conn.getresponse()
+ self.assertEqual(response.status, 201)
+ response.read()
+
+ self.http_conn.request('GET', url_path)
+ response = self.http_conn.getresponse()
+ self.assertEqual(response.status, 200)
+ contents = response.read()
+ self.assertEqual(contents, obj_contents)
+
+ def test_quarantine(self):
+ obj_hash = hash_path('a', 'c', 'o')
+ url_path = '/sda1/2100/a/c/o'
+ ts = '1402601849.47475'
+
+ self.http_conn.request('PUT', url_path, 'obj contents',
+ {'X-Timestamp': ts})
+ response = self.http_conn.getresponse()
+ self.assertEqual(response.status, 201)
+ response.read()
+
+ # go goof up the file on disk
+ fname = os.path.join(self.testdir, 'sda1', 'objects', '2100',
+ obj_hash[-3:], obj_hash, ts + '.data')
+
+ with open(fname, 'rb+') as fh:
+ fh.write('XYZ')
+
+ self.http_conn.request('GET', url_path)
+ response = self.http_conn.getresponse()
+ self.assertEqual(response.status, 200)
+ contents = response.read()
+ self.assertEqual(contents, 'XYZ contents')
+
+ self.http_conn.request('GET', url_path)
+ response = self.http_conn.getresponse()
+ # it was quarantined by the previous request
+ self.assertEqual(response.status, 404)
+ response.read()
+
+ def test_quarantine_on_well_formed_zero_byte_file(self):
+ # Make sure we work around an oddity in Linux's hash sockets
+ url_path = '/sda1/2100/a/c/o'
+ ts = '1402700497.71333'
+
+ self.http_conn.request(
+ 'PUT', url_path, '',
+ {'X-Timestamp': ts, 'Content-Length': '0'})
+ response = self.http_conn.getresponse()
+ self.assertEqual(response.status, 201)
+ response.read()
+
+ self.http_conn.request('GET', url_path)
+ response = self.http_conn.getresponse()
+ self.assertEqual(response.status, 200)
+ contents = response.read()
+ self.assertEqual(contents, '')
+
+ self.http_conn.request('GET', url_path)
+ response = self.http_conn.getresponse()
+ self.assertEqual(response.status, 200) # still there
+ contents = response.read()
+ self.assertEqual(contents, '')
+
+
if __name__ == '__main__':
unittest.main()