summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDarrell Bishop <darrell@swiftstack.com>2012-08-25 23:02:53 -0700
committerDarrell Bishop <darrell@swiftstack.com>2012-08-25 23:02:53 -0700
commit2e89f09bc42a6ccd92dd17506c2306f72295fd7e (patch)
treedd9e17df78d7a4ac60669a47c3c44b7f135348be
parent7335bea08e565064d7dcf95832a8539945deaf95 (diff)
downloadswift-bench-2e89f09bc42a6ccd92dd17506c2306f72295fd7e.tar.gz
Can run swift-bench across multiple cores/servers.
You run one or more swift-bench-client processes like this: $ swift-bench-client 127.0.0.1 20001 $ swift-bench-client 127.0.0.1 20002 Then you run swift-bench with a new option, --bench-clients (-b), which is specified once for each swift-bench-client: $ swift-bench -b 127.0.0.1:20001 -b 127.0.0.1:20002 You get log lines from each client (interleaved) along with a final report for all clients: 127.0.0.1:20002 swift-bench-server 2012-08-25 22:44:06,148 INFO Auth version: 1.0 127.0.0.1:20001 swift-bench-server 2012-08-25 22:44:06,148 INFO Auth version: 1.0 127.0.0.1:20001 swift-bench-server 2012-08-25 22:44:12,249 INFO 83 PUTS [0 failures], 41.5/s 127.0.0.1:20002 swift-bench-server 2012-08-25 22:44:14,430 INFO 74 PUTS [0 failures], 34.3/s ... 127.0.0.1:20002 swift-bench-server 2012-08-25 22:45:18,942 INFO Auth version: 1.0 127.0.0.1:20002 swift-bench-server 2012-08-25 22:45:20,946 INFO 238 DEL [2 failures], 118.9/s swift-bench 2012-08-25 22:45:27,549 INFO 2000 PUTS **FINAL** [0 failures], 56.8/s swift-bench 2012-08-25 22:45:27,550 INFO 30000 GETS **FINAL** [50 failures], 974.6/s swift-bench 2012-08-25 22:45:27,550 INFO 2000 DEL **FINAL** [20 failures], 237.1/s The concurrency, PUT count, and GET count config settings are divided by the number of bench_clients. In other words, the same volume of work is attempted (vs. not specifying --bench-clients), but it can now span servers and CPU cores. Benchmark containers are created (if use_proxy = yes) and deleted (if delete = yes), with appropriate concurrency, in the initiating swift-bench process, not any of the swift-bench-client processes. Change-Id: Idbf31a23093244ab357a9bf77e6031257774f24a
-rwxr-xr-xbin/swift-bench24
-rwxr-xr-xbin/swift-bench-client59
-rw-r--r--swift/common/bench.py198
3 files changed, 261 insertions, 20 deletions
diff --git a/bin/swift-bench b/bin/swift-bench
index b69671a..edeb511 100755
--- a/bin/swift-bench
+++ b/bin/swift-bench
@@ -21,7 +21,8 @@ import signal
import uuid
from optparse import OptionParser
-from swift.common.bench import BenchController
+from swift.common.bench import (BenchController, DistributedBenchController,
+ create_containers, delete_containers)
from swift.common.utils import readconf, LogAdapter
# The defaults should be sufficient to run swift-bench on a SAIO
@@ -49,6 +50,8 @@ CONF_DEFAULTS = {
'devices': 'sdb1', # space-sep list
'log_level': 'INFO',
'timeout': '10',
+ 'auth_version': '1.0',
+ 'bench_clients': [],
}
SAIO_DEFAULTS = {
@@ -81,6 +84,13 @@ if __name__ == '__main__':
help='User name for obtaining an auth token')
parser.add_option('-K', '--key', dest='key',
help='Key for obtaining an auth token')
+ parser.add_option('-b', '--bench-clients', action='append',
+ metavar='<ip>:<port>',
+ help=('A string of the form "<ip>:<port>" which matches '
+ 'the arguments supplied to a swift-bench-client '
+ 'process. This argument must be specified '
+ 'once per swift-bench-client you want to '
+ 'utilize.'))
parser.add_option('-u', '--url', dest='url',
help='Storage URL')
parser.add_option('-c', '--concurrency', dest='concurrency',
@@ -125,6 +135,8 @@ if __name__ == '__main__':
options.put_concurrency = options.concurrency
options.get_concurrency = options.concurrency
options.del_concurrency = options.concurrency
+ options.containers = ['%s_%d' % (options.container_name, i)
+ for i in xrange(int(options.num_containers))]
def sigterm(signum, frame):
sys.exit('Termination signal received.')
@@ -145,5 +157,13 @@ if __name__ == '__main__':
'%(message)s')
loghandler.setFormatter(logformat)
- controller = BenchController(logger, options)
+ if options.use_proxy:
+ create_containers(logger, options)
+
+ controller_class = DistributedBenchController if options.bench_clients \
+ else BenchController
+ controller = controller_class(logger, options)
controller.run()
+
+ if options.delete:
+ delete_containers(logger, options)
diff --git a/bin/swift-bench-client b/bin/swift-bench-client
new file mode 100755
index 0000000..9473b45
--- /dev/null
+++ b/bin/swift-bench-client
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import sys
+import signal
+from optparse import OptionParser
+
+from swift.common.bench import BenchServer
+from swift.common.utils import LogAdapter
+
+if __name__ == '__main__':
+ usage = "usage: %prog <ip> <port>"
+ usage += "\n\nRun a client for distributed swift-bench runs."
+ parser = OptionParser(usage=usage)
+ parser.add_option('-o', '--log-level', dest='log_level',
+ default='info',
+ help='Logging level (debug, info, etc)')
+
+ if len(sys.argv) != 3:
+ parser.print_help()
+ sys.exit(1)
+ options, args = parser.parse_args()
+
+ logger = logging.getLogger()
+ logger.setLevel({
+ 'debug': logging.DEBUG,
+ 'info': logging.INFO,
+ 'warning': logging.WARNING,
+ 'error': logging.ERROR,
+ 'critical': logging.CRITICAL}.get(
+ options.log_level.lower(), logging.INFO))
+ loghandler = logging.StreamHandler()
+ logger.addHandler(loghandler)
+ logger = LogAdapter(logger, 'swift-bench-client')
+ logformat = logging.Formatter('%(server)s %(asctime)s %(levelname)s '
+ '%(message)s')
+ loghandler.setFormatter(logformat)
+
+ def sigterm(signum, frame):
+ sys.exit('Termination signal received.')
+ signal.signal(signal.SIGTERM, sigterm)
+ signal.signal(signal.SIGINT, sigterm)
+
+ server = BenchServer(logger, args[0], args[1])
+ server.run()
diff --git a/swift/common/bench.py b/swift/common/bench.py
index 3edfaec..89e8a21 100644
--- a/swift/common/bench.py
+++ b/swift/common/bench.py
@@ -13,21 +13,61 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import re
import sys
import uuid
import time
import random
import signal
+import socket
+import logging
from contextlib import contextmanager
+from optparse import Values
+import eventlet
import eventlet.pools
from eventlet.green.httplib import CannotSendRequest
-from swift.common.utils import TRUE_VALUES
+from swift.common.utils import TRUE_VALUES, LogAdapter
import swiftclient as client
from swift.common import direct_client
from swift.common.http import HTTP_CONFLICT
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+
+def _func_on_containers(logger, conf, concurrency_key, func):
+ """Run a function on each container with concurrency."""
+
+ bench = Bench(logger, conf, [])
+ pool = eventlet.GreenPool(int(getattr(conf, concurrency_key)))
+ for container in conf.containers:
+ pool.spawn_n(func, bench.url, bench.token, container)
+ pool.waitall()
+
+
+def delete_containers(logger, conf):
+ """Utility function to delete benchmark containers."""
+
+ def _deleter(url, token, container):
+ try:
+ client.delete_container(url, token, container)
+ except client.ClientException, e:
+ if e.http_status != HTTP_CONFLICT:
+ logger.warn("Unable to delete container '%s'. "
+ "Got http status '%d'." % (container, e.http_status))
+
+ _func_on_containers(logger, conf, 'del_concurrency', _deleter)
+
+
+def create_containers(logger, conf):
+ """Utility function to create benchmark containers."""
+
+ _func_on_containers(logger, conf, 'put_concurrency', client.put_container)
+
class ConnectionPool(eventlet.pools.Pool):
@@ -39,6 +79,62 @@ class ConnectionPool(eventlet.pools.Pool):
return client.http_connection(self.url)
+class BenchServer(object):
+ """
+ A BenchServer binds to an IP/port and listens for bench jobs. A bench
+ job consists of the normal conf "dict" encoded in JSON, terminated with an
+ EOF. The log level is at least INFO, but DEBUG may also be specified in
+ the conf dict.
+
+ The server will wait forever for jobs, running them one at a time.
+ """
+ def __init__(self, logger, bind_ip, bind_port):
+ self.logger = logger
+ self.bind_ip = bind_ip
+ self.bind_port = int(bind_port)
+
+ def run(self):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.logger.info('Binding to %s:%s', self.bind_ip, self.bind_port)
+ s.bind((self.bind_ip, self.bind_port))
+ s.listen(20)
+ while True:
+ client, address = s.accept()
+ self.logger.debug('Accepting connection from %s:%s', *address)
+ client_file = client.makefile('rb+', 1)
+ json_data = client_file.read()
+ conf = Values(json.loads(json_data))
+
+ self.logger.info(
+ 'Starting run for %s:%s [put/get/del_concurrency: %s/%s/%s, '
+ 'num_objects: %s, num_gets: %s]', address[0], address[1],
+ conf.put_concurrency, conf.get_concurrency,
+ conf.del_concurrency, conf.num_objects, conf.num_gets)
+
+ logger = logging.getLogger('bench-server')
+ level = logging.DEBUG if conf.log_level.lower() == 'debug' \
+ else logging.INFO
+ logger.setLevel(level)
+ loghandler = logging.StreamHandler(stream=client_file)
+ logformat = logging.Formatter(
+ '%(server)s %(asctime)s %(levelname)s %(message)s')
+ loghandler.setFormatter(logformat)
+ logger.addHandler(loghandler)
+ logger = LogAdapter(logger, 'swift-bench-server')
+
+ controller = BenchController(logger, conf)
+ try:
+ controller.run()
+ except socket.error:
+ logger.warning('Socket error', exc_info=1)
+
+ logger.logger.removeHandler(loghandler)
+ client_file.close()
+ client.close()
+
+ self.logger.info('...bench run completed; waiting for next run.')
+
+
class Bench(object):
def __init__(self, logger, conf, names):
@@ -64,8 +160,6 @@ class Bench(object):
self.account = conf.account
self.url = conf.url
self.ip, self.port = self.url.split('/')[2].split(':')
- self.containers = ['%s_%d' % (conf.container_name, i)
- for i in xrange(int(conf.num_containers))]
self.object_size = int(conf.object_size)
self.object_sources = conf.object_sources
@@ -129,6 +223,88 @@ class Bench(object):
return
+class DistributedBenchController(object):
+ """
+ This class manages a distributed swift-bench run. For this Controller
+ class to make sense, the conf.bench_clients list must contain at least one
+ entry.
+
+ The idea is to split the configured load between one or more
+ swift-bench-client processes, each of which use eventlet for concurrency.
+ We deliberately take a simple, naive approach with these limitations:
+ 1) Concurrency, num_objects, and num_gets are spread evenly between the
+ swift-bench-client processes. With a low concurrency to
+ swift-bench-client count ratio, rounding may result in a greater
+ than desired aggregate concurrency.
+ 2) Each swift-bench-client process runs independently so some may
+ finish up before others, i.e. the target aggregate concurrency is
+ not necessarily present the whole time. This may bias aggregate
+ reported rates lower than a more efficient architecture.
+ 3) Because of #2, some swift-bench-client processes may be running GETs
+ while others are still runinng their PUTs. Because of this
+ potential skew, distributed runs will not isolate one operation at a
+ time like a single swift-bench run will.
+ 3) Reported aggregate rates are simply the sum of each
+ swift-bench-client process reported FINAL number. That's probably
+ inaccurate somehow.
+ """
+
+ def __init__(self, logger, conf):
+ self.logger = logger
+ # ... INFO 1000 PUTS **FINAL** [0 failures], 34.9/s
+ self.final_re = re.compile(
+ 'INFO (\d+) (.*) \*\*FINAL\*\* \[(\d+) failures\], (\d+\.\d+)/s')
+ self.clients = conf.bench_clients
+ del conf.bench_clients
+ for k in ['put_concurrency', 'get_concurrency', 'del_concurrency',
+ 'num_objects', 'num_gets']:
+ setattr(conf, k, max(1, int(getattr(conf, k)) / len(self.clients)))
+ self.conf = conf
+
+ def run(self):
+ eventlet.patcher.monkey_patch(socket=True)
+ pool = eventlet.GreenPool(size=len(self.clients))
+ pile = eventlet.GreenPile(pool)
+ for client in self.clients:
+ pile.spawn(self.do_run, client)
+ results = {
+ 'PUTS': dict(count=0, failures=0, rate=0.0),
+ 'GETS': dict(count=0, failures=0, rate=0.0),
+ 'DEL': dict(count=0, failures=0, rate=0.0),
+ }
+ for result in pile:
+ for k, v in result.iteritems():
+ target = results[k]
+ target['count'] += int(v['count'])
+ target['failures'] += int(v['failures'])
+ target['rate'] += float(v['rate'])
+ for k in ['PUTS', 'GETS', 'DEL']:
+ v = results[k]
+ self.logger.info('%d %s **FINAL** [%d failures], %.1f/s' % (
+ v['count'], k, v['failures'], v['rate']))
+
+ def do_run(self, client):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ ip, port = client.split(':')
+ s.connect((ip, int(port)))
+ s.sendall(json.dumps(self.conf.__dict__))
+ s.shutdown(socket.SHUT_WR)
+ s_file = s.makefile('rb', 1)
+ result = {}
+ for line in s_file:
+ match = self.final_re.search(line)
+ if match:
+ g = match.groups()
+ result[g[1]] = {
+ 'count': g[0],
+ 'failures': g[2],
+ 'rate': g[3],
+ }
+ else:
+ sys.stderr.write('%s %s' % (client, line))
+ return result
+
+
class BenchController(object):
def __init__(self, logger, conf):
@@ -177,16 +353,6 @@ class BenchDELETE(Bench):
self.total = len(names)
self.msg = 'DEL'
- def run(self):
- Bench.run(self)
- for container in self.containers:
- try:
- client.delete_container(self.url, self.token, container)
- except client.ClientException, e:
- if e.http_status != HTTP_CONFLICT:
- self._log_status("Unable to delete container '%s'. " \
- "Got http status '%d'." % (container, e.http_status))
-
def _run(self, thread):
if time.time() - self.heartbeat >= 15:
self.heartbeat = time.time()
@@ -242,11 +408,7 @@ class BenchPUT(Bench):
self.concurrency = self.put_concurrency
self.total = self.total_objects
self.msg = 'PUTS'
- if self.use_proxy:
- with self.connection() as conn:
- for container_name in self.containers:
- client.put_container(self.url, self.token,
- container_name, http_conn=conn)
+ self.containers = conf.containers
def _run(self, thread):
if time.time() - self.heartbeat >= 15: