diff options
Diffstat (limited to 'swift')
-rwxr-xr-x | swift/cli/recon.py | 45 | ||||
-rw-r--r-- | swift/common/constraints.py | 93 | ||||
-rw-r--r-- | swift/common/manager.py | 21 | ||||
-rw-r--r-- | swift/common/middleware/account_quotas.py | 10 | ||||
-rw-r--r-- | swift/common/middleware/gatekeeper.py | 3 | ||||
-rw-r--r-- | swift/common/middleware/proxy_logging.py | 18 | ||||
-rw-r--r-- | swift/common/ring/builder.py | 4 | ||||
-rw-r--r-- | swift/common/swob.py | 5 | ||||
-rw-r--r-- | swift/common/utils.py | 24 | ||||
-rw-r--r-- | swift/common/wsgi.py | 7 | ||||
-rw-r--r-- | swift/container/server.py | 26 | ||||
-rw-r--r-- | swift/obj/auditor.py | 147 | ||||
-rw-r--r-- | swift/obj/diskfile.py | 26 | ||||
-rw-r--r-- | swift/obj/server.py | 15 | ||||
-rw-r--r-- | swift/proxy/controllers/base.py | 47 | ||||
-rw-r--r-- | swift/proxy/controllers/container.py | 7 | ||||
-rw-r--r-- | swift/proxy/server.py | 10 |
17 files changed, 357 insertions, 151 deletions
diff --git a/swift/cli/recon.py b/swift/cli/recon.py index ba1448908..c91baf8e7 100755 --- a/swift/cli/recon.py +++ b/swift/cli/recon.py @@ -488,6 +488,24 @@ class SwiftRecon(object): (self._ptime(low), self._ptime(high), self._ptime(average)) print "=" * 79 + def nested_get_value(self, key, recon_entry): + """ + Generator that yields all values for given key in a recon cache entry. + This is for use with object auditor recon cache entries. If the + object auditor has run in 'once' mode with a subset of devices + specified the checksum auditor section will have an entry of the form: + {'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}} + The same is true of the ZBF auditor cache entry section. We use this + generator to find all instances of a particular key in these multi- + level dictionaries. + """ + for k, v in recon_entry.items(): + if isinstance(v, dict): + for value in self.nested_get_value(key, v): + yield value + if k == key: + yield v + def object_auditor_check(self, hosts): """ Obtain and print obj auditor statistics @@ -513,11 +531,16 @@ class SwiftRecon(object): zbf_scan[url] = response['object_auditor_stats_ZBF'] if len(all_scan) > 0: stats = {} - stats[atime] = [all_scan[i][atime] for i in all_scan] - stats[bprocessed] = [all_scan[i][bprocessed] for i in all_scan] - stats[passes] = [all_scan[i][passes] for i in all_scan] - stats[errors] = [all_scan[i][errors] for i in all_scan] - stats[quarantined] = [all_scan[i][quarantined] for i in all_scan] + stats[atime] = [(self.nested_get_value(atime, all_scan[i])) + for i in all_scan] + stats[bprocessed] = [(self.nested_get_value(bprocessed, + all_scan[i])) for i in all_scan] + stats[passes] = [(self.nested_get_value(passes, all_scan[i])) + for i in all_scan] + stats[errors] = [(self.nested_get_value(errors, all_scan[i])) + for i in all_scan] + stats[quarantined] = [(self.nested_get_value(quarantined, + all_scan[i])) for i in all_scan] for k in stats: if None in stats[k]: stats[k] = [x for x in stats[k] if x is not None] @@ -534,10 +557,14 @@ class SwiftRecon(object): print "[ALL_auditor] - No hosts returned valid data." if len(zbf_scan) > 0: stats = {} - stats[atime] = [zbf_scan[i][atime] for i in zbf_scan] - stats[bprocessed] = [zbf_scan[i][bprocessed] for i in zbf_scan] - stats[errors] = [zbf_scan[i][errors] for i in zbf_scan] - stats[quarantined] = [zbf_scan[i][quarantined] for i in zbf_scan] + stats[atime] = [(self.nested_get_value(atime, zbf_scan[i])) + for i in zbf_scan] + stats[bprocessed] = [(self.nested_get_value(bprocessed, + zbf_scan[i])) for i in zbf_scan] + stats[errors] = [(self.nested_get_value(errors, zbf_scan[i])) + for i in zbf_scan] + stats[quarantined] = [(self.nested_get_value(quarantined, + zbf_scan[i])) for i in zbf_scan] for k in stats: if None in stats[k]: stats[k] = [x for x in stats[k] if x is not None] diff --git a/swift/common/constraints.py b/swift/common/constraints.py index dd5c5c410..7a480eaec 100644 --- a/swift/common/constraints.py +++ b/swift/common/constraints.py @@ -18,46 +18,69 @@ import urllib from urllib import unquote from ConfigParser import ConfigParser, NoSectionError, NoOptionError -from swift.common.utils import ismount, split_path +from swift.common.utils import ismount, split_path, SWIFT_CONF_FILE from swift.common.swob import HTTPBadRequest, HTTPLengthRequired, \ HTTPRequestEntityTooLarge, HTTPPreconditionFailed -constraints_conf = ConfigParser() -constraints_conf.read('/etc/swift/swift.conf') +MAX_FILE_SIZE = 5368709122 +MAX_META_NAME_LENGTH = 128 +MAX_META_VALUE_LENGTH = 256 +MAX_META_COUNT = 90 +MAX_META_OVERALL_SIZE = 4096 +MAX_HEADER_SIZE = 8192 +MAX_OBJECT_NAME_LENGTH = 1024 +CONTAINER_LISTING_LIMIT = 10000 +ACCOUNT_LISTING_LIMIT = 10000 +MAX_ACCOUNT_NAME_LENGTH = 256 +MAX_CONTAINER_NAME_LENGTH = 256 + +DEFAULT_CONSTRAINTS = { + 'max_file_size': MAX_FILE_SIZE, + 'max_meta_name_length': MAX_META_NAME_LENGTH, + 'max_meta_value_length': MAX_META_VALUE_LENGTH, + 'max_meta_count': MAX_META_COUNT, + 'max_meta_overall_size': MAX_META_OVERALL_SIZE, + 'max_header_size': MAX_HEADER_SIZE, + 'max_object_name_length': MAX_OBJECT_NAME_LENGTH, + 'container_listing_limit': CONTAINER_LISTING_LIMIT, + 'account_listing_limit': ACCOUNT_LISTING_LIMIT, + 'max_account_name_length': MAX_ACCOUNT_NAME_LENGTH, + 'max_container_name_length': MAX_CONTAINER_NAME_LENGTH, +} + +SWIFT_CONSTRAINTS_LOADED = False +OVERRIDE_CONSTRAINTS = {} # any constraints overridden by SWIFT_CONF_FILE +EFFECTIVE_CONSTRAINTS = {} # populated by reload_constraints + + +def reload_constraints(): + """ + Parse SWIFT_CONF_FILE and reset module level global contraint attrs, + populating OVERRIDE_CONSTRAINTS AND EFFECTIVE_CONSTRAINTS along the way. + """ + global SWIFT_CONSTRAINTS_LOADED, OVERRIDE_CONSTRAINTS + SWIFT_CONSTRAINTS_LOADED = False + OVERRIDE_CONSTRAINTS = {} + constraints_conf = ConfigParser() + if constraints_conf.read(SWIFT_CONF_FILE): + SWIFT_CONSTRAINTS_LOADED = True + for name in DEFAULT_CONSTRAINTS: + try: + value = int(constraints_conf.get('swift-constraints', name)) + except (NoSectionError, NoOptionError): + pass + else: + OVERRIDE_CONSTRAINTS[name] = value + for name, default in DEFAULT_CONSTRAINTS.items(): + value = OVERRIDE_CONSTRAINTS.get(name, default) + EFFECTIVE_CONSTRAINTS[name] = value + # "globals" in this context is module level globals, always. + globals()[name.upper()] = value + + +reload_constraints() -def constraints_conf_int(name, default): - try: - return int(constraints_conf.get('swift-constraints', name)) - except (NoSectionError, NoOptionError): - return default - - -#: Max file size allowed for objects -MAX_FILE_SIZE = constraints_conf_int('max_file_size', - 5368709122) # 5 * 1024 * 1024 * 1024 + 2 -#: Max length of the name of a key for metadata -MAX_META_NAME_LENGTH = constraints_conf_int('max_meta_name_length', 128) -#: Max length of the value of a key for metadata -MAX_META_VALUE_LENGTH = constraints_conf_int('max_meta_value_length', 256) -#: Max number of metadata items -MAX_META_COUNT = constraints_conf_int('max_meta_count', 90) -#: Max overall size of metadata -MAX_META_OVERALL_SIZE = constraints_conf_int('max_meta_overall_size', 4096) -#: Max size of any header -MAX_HEADER_SIZE = constraints_conf_int('max_header_size', 8192) -#: Max object name length -MAX_OBJECT_NAME_LENGTH = constraints_conf_int('max_object_name_length', 1024) -#: Max object list length of a get request for a container -CONTAINER_LISTING_LIMIT = constraints_conf_int('container_listing_limit', - 10000) -#: Max container list length of a get request for an account -ACCOUNT_LISTING_LIMIT = constraints_conf_int('account_listing_limit', 10000) -#: Max account name length -MAX_ACCOUNT_NAME_LENGTH = constraints_conf_int('max_account_name_length', 256) -#: Max container name length -MAX_CONTAINER_NAME_LENGTH = constraints_conf_int('max_container_name_length', - 256) # Maximum slo segments in buffer MAX_BUFFERED_SLO_SEGMENTS = 10000 diff --git a/swift/common/manager.py b/swift/common/manager.py index 47b275e6e..28e928337 100644 --- a/swift/common/manager.py +++ b/swift/common/manager.py @@ -163,6 +163,9 @@ class Manager(object): for name in server_names: self.servers.add(Server(name, run_dir)) + def __iter__(self): + return iter(self.servers) + @command def status(self, **kwargs): """display status of tracked pids for server @@ -251,6 +254,17 @@ class Manager(object): return 1 @command + def kill(self, **kwargs): + """stop a server (no error if not running) + """ + status = self.stop(**kwargs) + kwargs['quiet'] = True + if status and not self.status(**kwargs): + # only exit error if the server is still running + return status + return 0 + + @command def shutdown(self, **kwargs): """allow current requests to finish on supporting servers """ @@ -523,7 +537,7 @@ class Server(object): :param conf_file: path to conf_file to use as first arg :param once: boolean, add once argument to command :param wait: boolean, if true capture stdout with a pipe - :param daemon: boolean, if true ask server to log to console + :param daemon: boolean, if false ask server to log to console :returns : the pid of the spawned process """ @@ -560,6 +574,11 @@ class Server(object): for proc in self.procs: # wait for process to close its stdout output = proc.stdout.read() + if kwargs.get('once', False): + # if you don't want once to wait you can send it to the + # background on the command line, I generally just run with + # no-daemon anyway, but this is quieter + proc.wait() if output: print output start = time.time() diff --git a/swift/common/middleware/account_quotas.py b/swift/common/middleware/account_quotas.py index e4aa23504..ec4a3fda2 100644 --- a/swift/common/middleware/account_quotas.py +++ b/swift/common/middleware/account_quotas.py @@ -52,7 +52,7 @@ Due to the eventual consistency further uploads might be possible until the account size has been updated. """ - +from swift.common.constraints import check_copy_from_header from swift.common.swob import HTTPForbidden, HTTPRequestEntityTooLarge, \ HTTPBadRequest, wsgify from swift.common.utils import register_swift_info @@ -109,7 +109,11 @@ class AccountQuotaMiddleware(object): if request.method == 'COPY': copy_from = container + '/' + obj else: - copy_from = request.headers.get('X-Copy-From') + if 'x-copy-from' in request.headers: + src_cont, src_obj = check_copy_from_header(request) + copy_from = "%s/%s" % (src_cont, src_obj) + else: + copy_from = None content_length = (request.content_length or 0) @@ -124,7 +128,7 @@ class AccountQuotaMiddleware(object): return self.app if copy_from: - path = '/' + ver + '/' + account + '/' + copy_from.lstrip('/') + path = '/' + ver + '/' + account + '/' + copy_from object_info = get_object_info(request.environ, self.app, path) if not object_info or not object_info['length']: content_length = 0 diff --git a/swift/common/middleware/gatekeeper.py b/swift/common/middleware/gatekeeper.py index f645d1dd5..5e680d0e2 100644 --- a/swift/common/middleware/gatekeeper.py +++ b/swift/common/middleware/gatekeeper.py @@ -45,7 +45,8 @@ import re # rather than prefix match. inbound_exclusions = [get_sys_meta_prefix('account'), get_sys_meta_prefix('container'), - get_sys_meta_prefix('object')] + get_sys_meta_prefix('object'), + 'x-backend'] # 'x-object-sysmeta' is reserved in anticipation of future support # for system metadata being applied to objects diff --git a/swift/common/middleware/proxy_logging.py b/swift/common/middleware/proxy_logging.py index e3b3b89ae..073270062 100644 --- a/swift/common/middleware/proxy_logging.py +++ b/swift/common/middleware/proxy_logging.py @@ -124,11 +124,11 @@ class ProxyLoggingMiddleware(object): def method_from_req(self, req): return req.environ.get('swift.orig_req_method', req.method) - def req_already_logged(self, req): - return req.environ.get('swift.proxy_access_log_made') + def req_already_logged(self, env): + return env.get('swift.proxy_access_log_made') - def mark_req_logged(self, req): - req.environ['swift.proxy_access_log_made'] = True + def mark_req_logged(self, env): + env['swift.proxy_access_log_made'] = True def obscure_sensitive(self, value): if value and len(value) > self.reveal_sensitive_prefix: @@ -147,8 +147,6 @@ class ProxyLoggingMiddleware(object): :param start_time: timestamp request started :param end_time: timestamp request completed """ - if self.req_already_logged(req): - return req_path = get_valid_utf8_str(req.path) the_request = quote(unquote(req_path), QUOTE_SAFE) if req.query_string: @@ -193,7 +191,6 @@ class ProxyLoggingMiddleware(object): start_time_str, end_time_str ))) - self.mark_req_logged(req) # Log timing and bytes-transfered data to StatsD metric_name = self.statsd_metric_name(req, status_int, method) # Only log data for valid controllers (or SOS) to keep the metric count @@ -220,6 +217,11 @@ class ProxyLoggingMiddleware(object): return '.'.join((stat_type, stat_method, str(status_int))) def __call__(self, env, start_response): + if self.req_already_logged(env): + return self.app(env, start_response) + + self.mark_req_logged(env) + start_response_args = [None] input_proxy = InputProxy(env['wsgi.input']) env['wsgi.input'] = input_proxy @@ -261,7 +263,7 @@ class ProxyLoggingMiddleware(object): # Log timing information for time-to-first-byte (GET requests only) method = self.method_from_req(req) - if method == 'GET' and not self.req_already_logged(req): + if method == 'GET': status_int = status_int_for_logging() metric_name = self.statsd_metric_name(req, status_int, method) if metric_name: diff --git a/swift/common/ring/builder.py b/swift/common/ring/builder.py index ae72e9cf9..978e6c51f 100644 --- a/swift/common/ring/builder.py +++ b/swift/common/ring/builder.py @@ -754,6 +754,7 @@ class RingBuilder(object): """ for dev in self._iter_devs(): dev['sort_key'] = self._sort_key_for(dev) + dev['tiers'] = tiers_for_dev(dev) available_devs = \ sorted((d for d in self._iter_devs() if d['weight']), @@ -764,7 +765,6 @@ class RingBuilder(object): tier2dev_sort_key = defaultdict(list) max_tier_depth = 0 for dev in available_devs: - dev['tiers'] = tiers_for_dev(dev) for tier in dev['tiers']: tier2devs[tier].append(dev) # <-- starts out sorted! tier2dev_sort_key[tier].append(dev['sort_key']) @@ -889,7 +889,7 @@ class RingBuilder(object): # Just to save memory and keep from accidental reuse. for dev in self._iter_devs(): del dev['sort_key'] - dev.pop('tiers', None) # May be absent for devices w/o weight + del dev['tiers'] def _sort_key_for(self, dev): return (dev['parts_wanted'], random.randint(0, 0xFFFF), dev['id']) diff --git a/swift/common/swob.py b/swift/common/swob.py index ba3b54bb7..638086ea0 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -138,12 +138,9 @@ def _datetime_property(header): if value is not None: try: parts = parsedate(self.headers[header])[:7] - date = datetime(*(parts + (UTC,))) + return datetime(*(parts + (UTC,))) except Exception: return None - if date.year < 1970: - raise ValueError('Somehow an invalid year') - return date def setter(self, value): if isinstance(value, (float, int, long)): diff --git a/swift/common/utils.py b/swift/common/utils.py index abebc6515..be76ddb6c 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2079,6 +2079,28 @@ def human_readable(value): return '%d%si' % (round(value), suffixes[index]) +def put_recon_cache_entry(cache_entry, key, item): + """ + Function that will check if item is a dict, and if so put it under + cache_entry[key]. We use nested recon cache entries when the object + auditor runs in 'once' mode with a specified subset of devices. + """ + if isinstance(item, dict): + if key not in cache_entry or key in cache_entry and not \ + isinstance(cache_entry[key], dict): + cache_entry[key] = {} + elif key in cache_entry and item == {}: + cache_entry.pop(key, None) + return + for k, v in item.items(): + if v == {}: + cache_entry[key].pop(k, None) + else: + cache_entry[key][k] = v + else: + cache_entry[key] = item + + def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2): """Update recon cache values @@ -2098,7 +2120,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2): #file doesn't have a valid entry, we'll recreate it pass for cache_key, cache_value in cache_dict.items(): - cache_entry[cache_key] = cache_value + put_recon_cache_entry(cache_entry, cache_key, cache_value) try: with NamedTemporaryFile(dir=os.path.dirname(cache_file), delete=False) as tf: diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index eab0988b1..19d5c689c 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -131,8 +131,10 @@ def monkey_patch_mimetools(): self.plisttext = '' else: orig_parsetype(self) + parsetype.patched = True - mimetools.Message.parsetype = parsetype + if not getattr(mimetools.Message.parsetype, 'patched', None): + mimetools.Message.parsetype = parsetype def get_socket(conf, default_port=8080): @@ -573,7 +575,8 @@ def make_env(env, method=None, path=None, agent='Swift', query_string=None, newenv = {} for name in ('eventlet.posthooks', 'HTTP_USER_AGENT', 'HTTP_HOST', 'PATH_INFO', 'QUERY_STRING', 'REMOTE_USER', 'REQUEST_METHOD', - 'SCRIPT_NAME', 'SERVER_NAME', 'SERVER_PORT', 'HTTP_ORIGIN', + 'SCRIPT_NAME', 'SERVER_NAME', 'SERVER_PORT', + 'HTTP_ORIGIN', 'HTTP_ACCESS_CONTROL_REQUEST_METHOD', 'SERVER_PROTOCOL', 'swift.cache', 'swift.source', 'swift.trans_id', 'swift.authorize_override', 'swift.authorize'): diff --git a/swift/container/server.py b/swift/container/server.py index 7f6e30ad9..4242465bf 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -227,6 +227,20 @@ class ContainerController(object): return HTTPNoContent(request=req) return HTTPNotFound() + def _update_or_create(self, req, broker, timestamp): + if not os.path.exists(broker.db_file): + try: + broker.initialize(timestamp) + except DatabaseAlreadyExists: + pass + else: + return True # created + created = broker.is_deleted() + broker.update_put_timestamp(timestamp) + if broker.is_deleted(): + raise HTTPConflict(request=req) + return created + @public @timing_stats() def PUT(self, req): @@ -261,17 +275,7 @@ class ContainerController(object): req.headers['x-etag']) return HTTPCreated(request=req) else: # put container - if not os.path.exists(broker.db_file): - try: - broker.initialize(timestamp) - created = True - except DatabaseAlreadyExists: - created = False - else: - created = broker.is_deleted() - broker.update_put_timestamp(timestamp) - if broker.is_deleted(): - return HTTPConflict(request=req) + created = self._update_or_create(req, broker, timestamp) metadata = {} metadata.update( (key, (value, timestamp)) diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index d22fd5f42..c5b708acc 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -14,14 +14,16 @@ # limitations under the License. import os +import sys import time +import signal from swift import gettext_ as _ from contextlib import closing from eventlet import Timeout from swift.obj import diskfile from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \ - list_from_csv, json + list_from_csv, json, listdir from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist from swift.common.daemon import Daemon @@ -31,10 +33,10 @@ SLEEP_BETWEEN_AUDITS = 30 class AuditorWorker(object): """Walk through file system to audit objects""" - def __init__(self, conf, logger, zero_byte_only_at_fps=0): + def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0): self.conf = conf self.logger = logger - self.devices = conf.get('devices', '/srv/node') + self.devices = devices self.diskfile_mgr = diskfile.DiskFileManager(conf, self.logger) self.max_files_per_second = float(conf.get('files_per_second', 20)) self.max_bytes_per_second = float(conf.get('bytes_per_second', @@ -53,24 +55,34 @@ class AuditorWorker(object): self.passes = 0 self.quarantines = 0 self.errors = 0 - self.recon_cache_path = conf.get('recon_cache_path', - '/var/cache/swift') - self.rcache = os.path.join(self.recon_cache_path, "object.recon") + self.rcache = rcache self.stats_sizes = sorted( [int(s) for s in list_from_csv(conf.get('object_size_stats'))]) self.stats_buckets = dict( [(s, 0) for s in self.stats_sizes + ['OVER']]) - def audit_all_objects(self, mode='once'): - self.logger.info(_('Begin object audit "%s" mode (%s)') % - (mode, self.auditor_type)) + def create_recon_nested_dict(self, top_level_key, device_list, item): + if device_list: + device_key = ''.join(sorted(device_list)) + return {top_level_key: {device_key: item}} + else: + return {top_level_key: item} + + def audit_all_objects(self, mode='once', device_dirs=None): + description = '' + if device_dirs: + device_dir_str = ','.join(sorted(device_dirs)) + description = _(' - %s') % device_dir_str + self.logger.info(_('Begin object audit "%s" mode (%s%s)') % + (mode, self.auditor_type, description)) begin = reported = time.time() self.total_bytes_processed = 0 self.total_files_processed = 0 total_quarantines = 0 total_errors = 0 time_auditing = 0 - all_locs = self.diskfile_mgr.object_audit_location_generator() + all_locs = self.diskfile_mgr.object_audit_location_generator( + device_dirs=device_dirs) for location in all_locs: loop_time = time.time() self.failsafe_object_audit(location) @@ -87,7 +99,7 @@ class AuditorWorker(object): 'files/sec: %(frate).2f , bytes/sec: %(brate).2f, ' 'Total time: %(total).2f, Auditing time: %(audit).2f, ' 'Rate: %(audit_rate).2f') % { - 'type': self.auditor_type, + 'type': '%s%s' % (self.auditor_type, description), 'start_time': time.ctime(reported), 'passes': self.passes, 'quars': self.quarantines, 'errors': self.errors, @@ -95,15 +107,14 @@ class AuditorWorker(object): 'brate': self.bytes_processed / (now - reported), 'total': (now - begin), 'audit': time_auditing, 'audit_rate': time_auditing / (now - begin)}) - dump_recon_cache({'object_auditor_stats_%s' % - self.auditor_type: { - 'errors': self.errors, - 'passes': self.passes, - 'quarantined': self.quarantines, - 'bytes_processed': self.bytes_processed, - 'start_time': reported, - 'audit_time': time_auditing}}, - self.rcache, self.logger) + cache_entry = self.create_recon_nested_dict( + 'object_auditor_stats_%s' % (self.auditor_type), + device_dirs, + {'errors': self.errors, 'passes': self.passes, + 'quarantined': self.quarantines, + 'bytes_processed': self.bytes_processed, + 'start_time': reported, 'audit_time': time_auditing}) + dump_recon_cache(cache_entry, self.rcache, self.logger) reported = now total_quarantines += self.quarantines total_errors += self.errors @@ -120,12 +131,19 @@ class AuditorWorker(object): 'Total errors: %(errors)d, Total files/sec: %(frate).2f, ' 'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, ' 'Rate: %(audit_rate).2f') % { - 'type': self.auditor_type, 'mode': mode, 'elapsed': elapsed, + 'type': '%s%s' % (self.auditor_type, description), + 'mode': mode, 'elapsed': elapsed, 'quars': total_quarantines + self.quarantines, 'errors': total_errors + self.errors, 'frate': self.total_files_processed / elapsed, 'brate': self.total_bytes_processed / elapsed, 'audit': time_auditing, 'audit_rate': time_auditing / elapsed}) + # Clear recon cache entry if device_dirs is set + if device_dirs: + cache_entry = self.create_recon_nested_dict( + 'object_auditor_stats_%s' % (self.auditor_type), + device_dirs, {}) + dump_recon_cache(cache_entry, self.rcache, self.logger) if self.stats_sizes: self.logger.info( _('Object audit stats: %s') % json.dumps(self.stats_buckets)) @@ -204,35 +222,100 @@ class ObjectAuditor(Daemon): def __init__(self, conf, **options): self.conf = conf self.logger = get_logger(conf, log_route='object-auditor') + self.devices = conf.get('devices', '/srv/node') self.conf_zero_byte_fps = int( conf.get('zero_byte_files_per_second', 50)) + self.recon_cache_path = conf.get('recon_cache_path', + '/var/cache/swift') + self.rcache = os.path.join(self.recon_cache_path, "object.recon") def _sleep(self): time.sleep(SLEEP_BETWEEN_AUDITS) + def clear_recon_cache(self, auditor_type): + """Clear recon cache entries""" + dump_recon_cache({'object_auditor_stats_%s' % auditor_type: {}}, + self.rcache, self.logger) + + def run_audit(self, **kwargs): + """Run the object audit""" + mode = kwargs.get('mode') + zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0) + device_dirs = kwargs.get('device_dirs') + worker = AuditorWorker(self.conf, self.logger, self.rcache, + self.devices, + zero_byte_only_at_fps=zero_byte_only_at_fps) + worker.audit_all_objects(mode=mode, device_dirs=device_dirs) + + def fork_child(self, zero_byte_fps=False, **kwargs): + """Child execution""" + pid = os.fork() + if pid: + return pid + else: + signal.signal(signal.SIGTERM, signal.SIG_DFL) + if zero_byte_fps: + kwargs['zero_byte_fps'] = self.conf_zero_byte_fps + self.run_audit(**kwargs) + sys.exit() + + def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs): + """Audit loop""" + self.clear_recon_cache('ALL') + self.clear_recon_cache('ZBF') + kwargs['device_dirs'] = override_devices + if parent: + kwargs['zero_byte_fps'] = zbo_fps + self.run_audit(**kwargs) + else: + pids = [] + if self.conf_zero_byte_fps: + zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs) + pids.append(zbf_pid) + pids.append(self.fork_child(**kwargs)) + while pids: + pid = os.wait()[0] + # ZBF scanner must be restarted as soon as it finishes + if self.conf_zero_byte_fps and pid == zbf_pid and \ + len(pids) > 1: + kwargs['device_dirs'] = override_devices + zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs) + pids.append(zbf_pid) + pids.remove(pid) + def run_forever(self, *args, **kwargs): """Run the object audit until stopped.""" # zero byte only command line option zbo_fps = kwargs.get('zero_byte_fps', 0) + parent = False if zbo_fps: # only start parent parent = True - else: - parent = os.fork() # child gets parent = 0 kwargs = {'mode': 'forever'} - if parent: - kwargs['zero_byte_fps'] = zbo_fps or self.conf_zero_byte_fps + while True: try: - self.run_once(**kwargs) + self.audit_loop(parent, zbo_fps, **kwargs) except (Exception, Timeout): self.logger.exception(_('ERROR auditing')) self._sleep() def run_once(self, *args, **kwargs): - """Run the object audit once.""" - mode = kwargs.get('mode', 'once') - zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0) - worker = AuditorWorker(self.conf, self.logger, - zero_byte_only_at_fps=zero_byte_only_at_fps) - worker.audit_all_objects(mode=mode) + """Run the object audit once""" + # zero byte only command line option + zbo_fps = kwargs.get('zero_byte_fps', 0) + override_devices = list_from_csv(kwargs.get('devices')) + # Remove bogus entries and duplicates from override_devices + override_devices = list( + set(listdir(self.devices)).intersection(set(override_devices))) + parent = False + if zbo_fps: + # only start parent + parent = True + kwargs = {'mode': 'once'} + + try: + self.audit_loop(parent, zbo_fps, override_devices=override_devices, + **kwargs) + except (Exception, Timeout): + self.logger.exception(_('ERROR auditing')) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 7c0a9483a..1e7481557 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -351,22 +351,32 @@ class AuditLocation(object): return str(self.path) -def object_audit_location_generator(devices, mount_check=True, logger=None): +def object_audit_location_generator(devices, mount_check=True, logger=None, + device_dirs=None): """ Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all - objects stored under that directory. The AuditLocation only knows the path - to the hash directory, not to the .data file therein (if any). This is to - avoid a double listdir(hash_dir); the DiskFile object will always do one, - so we don't. + objects stored under that directory if device_dirs isn't set. If + device_dirs is set, only yield AuditLocation for the objects under the + entries in device_dirs. The AuditLocation only knows the path to the hash + directory, not to the .data file therein (if any). This is to avoid a + double listdir(hash_dir); the DiskFile object will always do one, so + we don't. :param devices: parent directory of the devices to be audited :param mount_check: flag to check if a mount check should be performed on devices :param logger: a logger object + :device_dirs: a list of directories under devices to traverse """ - device_dirs = listdir(devices) + if not device_dirs: + device_dirs = listdir(devices) + else: + # remove bogus devices and duplicates from device_dirs + device_dirs = list( + set(listdir(devices)).intersection(set(device_dirs))) # randomize devices in case of process restart before sweep completed shuffle(device_dirs) + for device in device_dirs: if mount_check and not \ ismount(os.path.join(devices, device)): @@ -502,9 +512,9 @@ class DiskFileManager(object): return DiskFile(self, dev_path, self.threadpools[device], partition, account, container, obj, **kwargs) - def object_audit_location_generator(self): + def object_audit_location_generator(self, device_dirs=None): return object_audit_location_generator(self.devices, self.mount_check, - self.logger) + self.logger, device_dirs) def get_diskfile_from_audit_location(self, audit_location): dev_path = self.get_dev_path(audit_location.device, mount_check=False) diff --git a/swift/obj/server.py b/swift/obj/server.py index 3436b632c..a4d8de7c9 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -481,21 +481,16 @@ class ObjectController(object): obj_size = int(metadata['Content-Length']) file_x_ts = metadata['X-Timestamp'] file_x_ts_flt = float(file_x_ts) - try: - if_unmodified_since = request.if_unmodified_since - except (OverflowError, ValueError): - # catches timestamps before the epoch - return HTTPPreconditionFailed(request=request) file_x_ts_utc = datetime.fromtimestamp(file_x_ts_flt, UTC) + + if_unmodified_since = request.if_unmodified_since if if_unmodified_since and file_x_ts_utc > if_unmodified_since: return HTTPPreconditionFailed(request=request) - try: - if_modified_since = request.if_modified_since - except (OverflowError, ValueError): - # catches timestamps before the epoch - return HTTPPreconditionFailed(request=request) + + if_modified_since = request.if_modified_since if if_modified_since and file_x_ts_utc <= if_modified_since: return HTTPNotModified(request=request) + keep_cache = (self.keep_cache_private or ('X-Auth-Token' not in request.headers and 'X-Storage-Token' not in request.headers)) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 7aaf2d5f8..139d6609f 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -212,6 +212,10 @@ def cors_validation(func): # Call through to the decorated method resp = func(*a, **kw) + if controller.app.strict_cors_mode and \ + not controller.is_origin_allowed(cors_info, req_origin): + return resp + # Expose, # - simple response headers, # http://www.w3.org/TR/cors/#simple-response-header @@ -219,24 +223,32 @@ def cors_validation(func): # - user metadata headers # - headers provided by the user in # x-container-meta-access-control-expose-headers - expose_headers = ['cache-control', 'content-language', - 'content-type', 'expires', 'last-modified', - 'pragma', 'etag', 'x-timestamp', 'x-trans-id'] - for header in resp.headers: - if header.startswith('X-Container-Meta') or \ - header.startswith('X-Object-Meta'): - expose_headers.append(header.lower()) - if cors_info.get('expose_headers'): - expose_headers.extend( - [header_line.strip() - for header_line in cors_info['expose_headers'].split(' ') - if header_line.strip()]) - resp.headers['Access-Control-Expose-Headers'] = \ - ', '.join(expose_headers) + if 'Access-Control-Expose-Headers' not in resp.headers: + expose_headers = [ + 'cache-control', 'content-language', 'content-type', + 'expires', 'last-modified', 'pragma', 'etag', + 'x-timestamp', 'x-trans-id'] + for header in resp.headers: + if header.startswith('X-Container-Meta') or \ + header.startswith('X-Object-Meta'): + expose_headers.append(header.lower()) + if cors_info.get('expose_headers'): + expose_headers.extend( + [header_line.strip() + for header_line in + cors_info['expose_headers'].split(' ') + if header_line.strip()]) + resp.headers['Access-Control-Expose-Headers'] = \ + ', '.join(expose_headers) # The user agent won't process the response if the Allow-Origin # header isn't included - resp.headers['Access-Control-Allow-Origin'] = req_origin + if 'Access-Control-Allow-Origin' not in resp.headers: + if cors_info['allow_origin'] and \ + cors_info['allow_origin'].strip() == '*': + resp.headers['Access-Control-Allow-Origin'] = '*' + else: + resp.headers['Access-Control-Allow-Origin'] = req_origin return resp else: @@ -1256,7 +1268,10 @@ class Controller(object): list_from_csv(req.headers['Access-Control-Request-Headers'])) # Populate the response with the CORS preflight headers - headers['access-control-allow-origin'] = req_origin_value + if cors.get('allow_origin', '').strip() == '*': + headers['access-control-allow-origin'] = '*' + else: + headers['access-control-allow-origin'] = req_origin_value if cors.get('max_age') is not None: headers['access-control-max-age'] = cors.get('max_age') headers['access-control-allow-methods'] = \ diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index d4425d3a7..96cef9f41 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -15,8 +15,9 @@ from swift import gettext_ as _ from urllib import unquote +import time -from swift.common.utils import public, csv_append +from swift.common.utils import public, csv_append, normalize_timestamp from swift.common.constraints import check_metadata, MAX_CONTAINER_NAME_LENGTH from swift.common.http import HTTP_ACCEPTED from swift.proxy.controllers.base import Controller, delay_denial, \ @@ -182,7 +183,9 @@ class ContainerController(Controller): def _backend_requests(self, req, n_outgoing, account_partition, accounts): - headers = [self.generate_request_headers(req, transfer=True) + additional = {'X-Timestamp': normalize_timestamp(time.time())} + headers = [self.generate_request_headers(req, transfer=True, + additional=additional) for _junk in range(n_outgoing)] for i, account in enumerate(accounts): diff --git a/swift/proxy/server.py b/swift/proxy/server.py index d42688b42..f6af6e63c 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -130,6 +130,8 @@ class Application(object): a.strip() for a in conf.get('cors_allow_origin', '').split(',') if a.strip()] + self.strict_cors_mode = config_true_value( + conf.get('strict_cors_mode', 't')) self.node_timings = {} self.timing_expiry = int(conf.get('timing_expiry', 300)) self.sorting_method = conf.get('sorting_method', 'shuffle').lower() @@ -210,7 +212,8 @@ class Application(object): container_listing_limit=constraints.CONTAINER_LISTING_LIMIT, max_account_name_length=constraints.MAX_ACCOUNT_NAME_LENGTH, max_container_name_length=constraints.MAX_CONTAINER_NAME_LENGTH, - max_object_name_length=constraints.MAX_OBJECT_NAME_LENGTH) + max_object_name_length=constraints.MAX_OBJECT_NAME_LENGTH, + strict_cors_mode=self.strict_cors_mode) def check_config(self): """ @@ -261,11 +264,6 @@ class Application(object): try: if self.memcache is None: self.memcache = cache_from_env(env) - # Remove any x-backend-* headers since those are reserved for use - # by backends communicating with each other; no end user should be - # able to send those into the cluster. - for key in list(k for k in env if k.startswith('HTTP_X_BACKEND_')): - del env[key] req = self.update_request(Request(env)) return self.handle_request(req)(env, start_response) except UnicodeError: |