diff options
Diffstat (limited to 'swift/container/sync.py')
-rw-r--r-- | swift/container/sync.py | 123 |
1 files changed, 92 insertions, 31 deletions
diff --git a/swift/container/sync.py b/swift/container/sync.py index 4bf5fc5c3..edda5d1b6 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import errno import os import uuid from swift import gettext_ as _ @@ -25,8 +26,8 @@ from eventlet import sleep, Timeout import swift.common.db from swift.container.backend import ContainerBroker, DATADIR from swift.common.container_sync_realms import ContainerSyncRealms -from swift.common.direct_client import direct_get_object -from swift.common.internal_client import delete_object, put_object +from swift.common.internal_client import ( + delete_object, put_object, InternalClient, UnexpectedResponse) from swift.common.exceptions import ClientException from swift.common.ring import Ring from swift.common.ring.utils import is_local_device @@ -37,6 +38,53 @@ from swift.common.utils import ( from swift.common.daemon import Daemon from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND from swift.common.storage_policy import POLICIES +from swift.common.wsgi import ConfigString + + +ic_conf_body = """ +[DEFAULT] +# swift_dir = /etc/swift +# user = swift +# You can specify default log routing here if you want: +# log_name = swift +# log_facility = LOG_LOCAL0 +# log_level = INFO +# log_address = /dev/log +# +# comma separated list of functions to call to setup custom log handlers. +# functions get passed: conf, name, log_to_console, log_route, fmt, logger, +# adapted_logger +# log_custom_handlers = +# +# If set, log_udp_host will override log_address +# log_udp_host = +# log_udp_port = 514 +# +# You can enable StatsD logging here: +# log_statsd_host = localhost +# log_statsd_port = 8125 +# log_statsd_default_sample_rate = 1.0 +# log_statsd_sample_rate_factor = 1.0 +# log_statsd_metric_prefix = + +[pipeline:main] +pipeline = catch_errors proxy-logging cache proxy-server + +[app:proxy-server] +use = egg:swift#proxy +# See proxy-server.conf-sample for options + +[filter:cache] +use = egg:swift#memcache +# See proxy-server.conf-sample for options + +[filter:proxy-logging] +use = egg:swift#proxy_logging + +[filter:catch_errors] +use = egg:swift#catch_errors +# See proxy-server.conf-sample for options +""" class ContainerSync(Daemon): @@ -103,12 +151,12 @@ class ContainerSync(Daemon): loaded. This is overridden by unit tests. """ - def __init__(self, conf, container_ring=None): + def __init__(self, conf, container_ring=None, logger=None): #: The dict of configuration values from the [container-sync] section #: of the container-server.conf. self.conf = conf #: Logger to use for container-sync log lines. - self.logger = get_logger(conf, log_route='container-sync') + self.logger = logger or get_logger(conf, log_route='container-sync') #: Path to the local device mount points. self.devices = conf.get('devices', '/srv/node') #: Indicates whether mount points should be verified as actual mount @@ -158,6 +206,26 @@ class ContainerSync(Daemon): self._myport = int(conf.get('bind_port', 6001)) swift.common.db.DB_PREALLOCATION = \ config_true_value(conf.get('db_preallocation', 'f')) + request_tries = int(conf.get('request_tries') or 3) + + internal_client_conf_path = conf.get('internal_client_conf_path') + if not internal_client_conf_path: + self.logger.warning( + _('Configuration option internal_client_conf_path not ' + 'defined. Using default configuration, See ' + 'internal-client.conf-sample for options')) + internal_client_conf = ConfigString(ic_conf_body) + else: + internal_client_conf = internal_client_conf_path + try: + self.swift = InternalClient( + internal_client_conf, 'Swift Container Sync', request_tries) + except IOError as err: + if err.errno != errno.ENOENT: + raise + raise SystemExit( + _('Unable to load internal client from config: %r (%s)') % + (internal_client_conf_path, err)) def get_object_ring(self, policy_idx): """ @@ -378,39 +446,32 @@ class ContainerSync(Daemon): looking_for_timestamp = Timestamp(row['created_at']) timestamp = -1 headers = body = None - headers_out = {'X-Backend-Storage-Policy-Index': + # look up for the newest one + headers_out = {'X-Newest': True, + 'X-Backend-Storage-Policy-Index': str(info['storage_policy_index'])} - for node in nodes: - try: - these_headers, this_body = direct_get_object( - node, part, info['account'], info['container'], - row['name'], headers=headers_out, - resp_chunk_size=65536) - this_timestamp = Timestamp( - these_headers['x-timestamp']) - if this_timestamp > timestamp: - timestamp = this_timestamp - headers = these_headers - body = this_body - except ClientException as err: - # If any errors are not 404, make sure we report the - # non-404 one. We don't want to mistakenly assume the - # object no longer exists just because one says so and - # the others errored for some other reason. - if not exc or getattr( - exc, 'http_status', HTTP_NOT_FOUND) == \ - HTTP_NOT_FOUND: - exc = err - except (Exception, Timeout) as err: - exc = err + try: + source_obj_status, source_obj_info, source_obj_iter = \ + self.swift.get_object(info['account'], + info['container'], row['name'], + headers=headers_out, + acceptable_statuses=(2, 4)) + + except (Exception, UnexpectedResponse, Timeout) as err: + source_obj_info = {} + source_obj_iter = None + exc = err + timestamp = Timestamp(source_obj_info.get( + 'x-timestamp', 0)) + headers = source_obj_info + body = source_obj_iter if timestamp < looking_for_timestamp: if exc: raise exc raise Exception( - _('Unknown exception trying to GET: %(node)r ' + _('Unknown exception trying to GET: ' '%(account)r %(container)r %(object)r'), - {'node': node, 'part': part, - 'account': info['account'], + {'account': info['account'], 'container': info['container'], 'object': row['name']}) for key in ('date', 'last-modified'): |