summaryrefslogtreecommitdiff
path: root/swift/container/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'swift/container/sync.py')
-rw-r--r--swift/container/sync.py123
1 files changed, 92 insertions, 31 deletions
diff --git a/swift/container/sync.py b/swift/container/sync.py
index 4bf5fc5c3..edda5d1b6 100644
--- a/swift/container/sync.py
+++ b/swift/container/sync.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import errno
import os
import uuid
from swift import gettext_ as _
@@ -25,8 +26,8 @@ from eventlet import sleep, Timeout
import swift.common.db
from swift.container.backend import ContainerBroker, DATADIR
from swift.common.container_sync_realms import ContainerSyncRealms
-from swift.common.direct_client import direct_get_object
-from swift.common.internal_client import delete_object, put_object
+from swift.common.internal_client import (
+ delete_object, put_object, InternalClient, UnexpectedResponse)
from swift.common.exceptions import ClientException
from swift.common.ring import Ring
from swift.common.ring.utils import is_local_device
@@ -37,6 +38,53 @@ from swift.common.utils import (
from swift.common.daemon import Daemon
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
from swift.common.storage_policy import POLICIES
+from swift.common.wsgi import ConfigString
+
+
+ic_conf_body = """
+[DEFAULT]
+# swift_dir = /etc/swift
+# user = swift
+# You can specify default log routing here if you want:
+# log_name = swift
+# log_facility = LOG_LOCAL0
+# log_level = INFO
+# log_address = /dev/log
+#
+# comma separated list of functions to call to setup custom log handlers.
+# functions get passed: conf, name, log_to_console, log_route, fmt, logger,
+# adapted_logger
+# log_custom_handlers =
+#
+# If set, log_udp_host will override log_address
+# log_udp_host =
+# log_udp_port = 514
+#
+# You can enable StatsD logging here:
+# log_statsd_host = localhost
+# log_statsd_port = 8125
+# log_statsd_default_sample_rate = 1.0
+# log_statsd_sample_rate_factor = 1.0
+# log_statsd_metric_prefix =
+
+[pipeline:main]
+pipeline = catch_errors proxy-logging cache proxy-server
+
+[app:proxy-server]
+use = egg:swift#proxy
+# See proxy-server.conf-sample for options
+
+[filter:cache]
+use = egg:swift#memcache
+# See proxy-server.conf-sample for options
+
+[filter:proxy-logging]
+use = egg:swift#proxy_logging
+
+[filter:catch_errors]
+use = egg:swift#catch_errors
+# See proxy-server.conf-sample for options
+"""
class ContainerSync(Daemon):
@@ -103,12 +151,12 @@ class ContainerSync(Daemon):
loaded. This is overridden by unit tests.
"""
- def __init__(self, conf, container_ring=None):
+ def __init__(self, conf, container_ring=None, logger=None):
#: The dict of configuration values from the [container-sync] section
#: of the container-server.conf.
self.conf = conf
#: Logger to use for container-sync log lines.
- self.logger = get_logger(conf, log_route='container-sync')
+ self.logger = logger or get_logger(conf, log_route='container-sync')
#: Path to the local device mount points.
self.devices = conf.get('devices', '/srv/node')
#: Indicates whether mount points should be verified as actual mount
@@ -158,6 +206,26 @@ class ContainerSync(Daemon):
self._myport = int(conf.get('bind_port', 6001))
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
+ request_tries = int(conf.get('request_tries') or 3)
+
+ internal_client_conf_path = conf.get('internal_client_conf_path')
+ if not internal_client_conf_path:
+ self.logger.warning(
+ _('Configuration option internal_client_conf_path not '
+ 'defined. Using default configuration, See '
+ 'internal-client.conf-sample for options'))
+ internal_client_conf = ConfigString(ic_conf_body)
+ else:
+ internal_client_conf = internal_client_conf_path
+ try:
+ self.swift = InternalClient(
+ internal_client_conf, 'Swift Container Sync', request_tries)
+ except IOError as err:
+ if err.errno != errno.ENOENT:
+ raise
+ raise SystemExit(
+ _('Unable to load internal client from config: %r (%s)') %
+ (internal_client_conf_path, err))
def get_object_ring(self, policy_idx):
"""
@@ -378,39 +446,32 @@ class ContainerSync(Daemon):
looking_for_timestamp = Timestamp(row['created_at'])
timestamp = -1
headers = body = None
- headers_out = {'X-Backend-Storage-Policy-Index':
+ # look up for the newest one
+ headers_out = {'X-Newest': True,
+ 'X-Backend-Storage-Policy-Index':
str(info['storage_policy_index'])}
- for node in nodes:
- try:
- these_headers, this_body = direct_get_object(
- node, part, info['account'], info['container'],
- row['name'], headers=headers_out,
- resp_chunk_size=65536)
- this_timestamp = Timestamp(
- these_headers['x-timestamp'])
- if this_timestamp > timestamp:
- timestamp = this_timestamp
- headers = these_headers
- body = this_body
- except ClientException as err:
- # If any errors are not 404, make sure we report the
- # non-404 one. We don't want to mistakenly assume the
- # object no longer exists just because one says so and
- # the others errored for some other reason.
- if not exc or getattr(
- exc, 'http_status', HTTP_NOT_FOUND) == \
- HTTP_NOT_FOUND:
- exc = err
- except (Exception, Timeout) as err:
- exc = err
+ try:
+ source_obj_status, source_obj_info, source_obj_iter = \
+ self.swift.get_object(info['account'],
+ info['container'], row['name'],
+ headers=headers_out,
+ acceptable_statuses=(2, 4))
+
+ except (Exception, UnexpectedResponse, Timeout) as err:
+ source_obj_info = {}
+ source_obj_iter = None
+ exc = err
+ timestamp = Timestamp(source_obj_info.get(
+ 'x-timestamp', 0))
+ headers = source_obj_info
+ body = source_obj_iter
if timestamp < looking_for_timestamp:
if exc:
raise exc
raise Exception(
- _('Unknown exception trying to GET: %(node)r '
+ _('Unknown exception trying to GET: '
'%(account)r %(container)r %(object)r'),
- {'node': node, 'part': part,
- 'account': info['account'],
+ {'account': info['account'],
'container': info['container'],
'object': row['name']})
for key in ('date', 'last-modified'):