summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-10-02 13:54:51 +0000
committerGerrit Code Review <review@openstack.org>2015-10-02 13:54:51 +0000
commit0e3e2db913b15d0f5ef346233e87f45aaecbc2f9 (patch)
treea9834eee8f9c3c99763e52f47a6e9b5f75f423ca
parentfc7c8c23c9bf90755e384c74ecd7865c6b5f4b1e (diff)
parent29c10db0cbb1369a99c3c63d6f583951ba828b8e (diff)
downloadswift-0e3e2db913b15d0f5ef346233e87f45aaecbc2f9.tar.gz
Merge "Add POST capability to ssync for .meta files"
-rw-r--r--swift/common/utils.py24
-rw-r--r--swift/obj/diskfile.py103
-rw-r--r--swift/obj/mem_diskfile.py8
-rw-r--r--swift/obj/reconstructor.py32
-rw-r--r--swift/obj/server.py11
-rw-r--r--swift/obj/ssync_receiver.py128
-rw-r--r--swift/obj/ssync_sender.py109
-rw-r--r--test/probe/test_object_metadata_replication.py229
-rw-r--r--test/unit/common/test_utils.py38
-rw-r--r--test/unit/obj/test_diskfile.py240
-rwxr-xr-xtest/unit/obj/test_reconstructor.py8
-rwxr-xr-xtest/unit/obj/test_server.py112
-rw-r--r--test/unit/obj/test_ssync_receiver.py276
-rw-r--r--test/unit/obj/test_ssync_sender.py625
14 files changed, 1659 insertions, 284 deletions
diff --git a/swift/common/utils.py b/swift/common/utils.py
index 4c8ea6221..2020908d8 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -692,6 +692,7 @@ def drop_buffer_cache(fd, offset, length):
NORMAL_FORMAT = "%016.05f"
INTERNAL_FORMAT = NORMAL_FORMAT + '_%016x'
MAX_OFFSET = (16 ** 16) - 1
+PRECISION = 1e-5
# Setting this to True will cause the internal format to always display
# extended digits - even when the value is equivalent to the normalized form.
# This isn't ideal during an upgrade when some servers might not understand
@@ -736,7 +737,20 @@ class Timestamp(object):
compatible for normalized timestamps which do not include an offset.
"""
- def __init__(self, timestamp, offset=0):
+ def __init__(self, timestamp, offset=0, delta=0):
+ """
+ Create a new Timestamp.
+
+ :param timestamp: time in seconds since the Epoch, may be any of:
+
+ * a float or integer
+ * normalized/internalized string
+ * another instance of this class (offset is preserved)
+
+ :param offset: the second internal offset vector, an int
+ :param delta: deca-microsecond difference from the base timestamp
+ param, an int
+ """
if isinstance(timestamp, basestring):
parts = timestamp.split('_', 1)
self.timestamp = float(parts.pop(0))
@@ -754,6 +768,14 @@ class Timestamp(object):
raise ValueError('offset must be non-negative')
if self.offset > MAX_OFFSET:
raise ValueError('offset must be smaller than %d' % MAX_OFFSET)
+ self.raw = int(round(self.timestamp / PRECISION))
+ # add delta
+ if delta:
+ self.raw = self.raw + delta
+ if self.raw <= 0:
+ raise ValueError(
+ 'delta must be greater than %d' % (-1 * self.raw))
+ self.timestamp = float(self.raw * PRECISION)
def __repr__(self):
return INTERNAL_FORMAT % (self.timestamp, self.offset)
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index 7d3aa1464..5660b588b 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -887,12 +887,18 @@ class BaseDiskFileManager(object):
def yield_hashes(self, device, partition, policy,
suffixes=None, **kwargs):
"""
- Yields tuples of (full_path, hash_only, timestamp) for object
+ Yields tuples of (full_path, hash_only, timestamps) for object
information stored for the given device, partition, and
(optionally) suffixes. If suffixes is None, all stored
suffixes will be searched for object hashes. Note that if
suffixes is not None but empty, such as [], then nothing will
be yielded.
+
+ timestamps is a dict which may contain items mapping:
+ ts_data -> timestamp of data or tombstone file,
+ ts_meta -> timestamp of meta file, if one exists
+ where timestamps are instances of
+ :class:`~swift.common.utils.Timestamp`
"""
dev_path = self.get_dev_path(device)
if not dev_path:
@@ -906,27 +912,36 @@ class BaseDiskFileManager(object):
suffixes = (
(os.path.join(partition_path, suffix), suffix)
for suffix in suffixes)
+ key_preference = (
+ ('ts_meta', '.meta'),
+ ('ts_data', '.data'),
+ ('ts_data', '.ts'),
+ )
for suffix_path, suffix in suffixes:
for object_hash in self._listdir(suffix_path):
object_path = os.path.join(suffix_path, object_hash)
- newest_valid_file = None
try:
results = self.cleanup_ondisk_files(
object_path, self.reclaim_age, **kwargs)
- newest_valid_file = (results.get('.meta')
- or results.get('.data')
- or results.get('.ts'))
- if newest_valid_file:
- timestamp = self.parse_on_disk_filename(
- newest_valid_file)['timestamp']
- yield (object_path, object_hash, timestamp.internal)
+ timestamps = {}
+ for ts_key, ext in key_preference:
+ if ext not in results:
+ continue
+ timestamps[ts_key] = self.parse_on_disk_filename(
+ results[ext])['timestamp']
+ if 'ts_data' not in timestamps:
+ # file sets that do not include a .data or .ts
+ # file can not be opened and therefore can not
+ # be ssync'd
+ continue
+ yield (object_path, object_hash, timestamps)
except AssertionError as err:
self.logger.debug('Invalid file set in %s (%s)' % (
object_path, err))
except DiskFileError as err:
self.logger.debug(
- 'Invalid diskfile filename %r in %r (%s)' % (
- newest_valid_file, object_path, err))
+ 'Invalid diskfile filename in %r (%s)' % (
+ object_path, err))
class BaseDiskFileWriter(object):
@@ -1414,6 +1429,8 @@ class BaseDiskFile(object):
self._datadir = None
self._tmpdir = join(device_path, get_tmp_dir(policy))
self._metadata = None
+ self._datafile_metadata = None
+ self._metafile_metadata = None
self._data_file = None
self._fp = None
self._quarantined_dir = None
@@ -1454,6 +1471,12 @@ class BaseDiskFile(object):
raise DiskFileNotOpen()
return Timestamp(self._metadata.get('X-Timestamp'))
+ @property
+ def data_timestamp(self):
+ if self._datafile_metadata is None:
+ raise DiskFileNotOpen()
+ return Timestamp(self._datafile_metadata.get('X-Timestamp'))
+
@classmethod
def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy):
return cls(mgr, device_path, None, partition, _datadir=hash_dir_path,
@@ -1693,16 +1716,21 @@ class BaseDiskFile(object):
:func:`swift.obj.diskfile.DiskFile._verify_data_file`
"""
fp = open(data_file, 'rb')
- datafile_metadata = self._failsafe_read_metadata(fp, data_file)
+ self._datafile_metadata = self._failsafe_read_metadata(fp, data_file)
+ self._metadata = {}
if meta_file:
- self._metadata = self._failsafe_read_metadata(meta_file, meta_file)
+ self._metafile_metadata = self._failsafe_read_metadata(
+ meta_file, meta_file)
sys_metadata = dict(
- [(key, val) for key, val in datafile_metadata.items()
+ [(key, val) for key, val in self._datafile_metadata.items()
if key.lower() in DATAFILE_SYSTEM_META
or is_sys_meta('object', key)])
+ self._metadata.update(self._metafile_metadata)
self._metadata.update(sys_metadata)
+ # diskfile writer added 'name' to metafile, so remove it here
+ self._metafile_metadata.pop('name', None)
else:
- self._metadata = datafile_metadata
+ self._metadata.update(self._datafile_metadata)
if self._name is None:
# If we don't know our name, we were just given a hash dir at
# instantiation, so we'd better validate that the name hashes back
@@ -1712,6 +1740,37 @@ class BaseDiskFile(object):
self._verify_data_file(data_file, fp)
return fp
+ def get_metafile_metadata(self):
+ """
+ Provide the metafile metadata for a previously opened object as a
+ dictionary. This is metadata that was written by a POST and does not
+ include any persistent metadata that was set by the original PUT.
+
+ :returns: object's .meta file metadata dictionary, or None if there is
+ no .meta file
+ :raises DiskFileNotOpen: if the
+ :func:`swift.obj.diskfile.DiskFile.open` method was not previously
+ invoked
+ """
+ if self._metadata is None:
+ raise DiskFileNotOpen()
+ return self._metafile_metadata
+
+ def get_datafile_metadata(self):
+ """
+ Provide the datafile metadata for a previously opened object as a
+ dictionary. This is metadata that was included when the object was
+ first PUT, and does not include metadata set by any subsequent POST.
+
+ :returns: object's datafile metadata dictionary
+ :raises DiskFileNotOpen: if the
+ :func:`swift.obj.diskfile.DiskFile.open` method was not previously
+ invoked
+ """
+ if self._datafile_metadata is None:
+ raise DiskFileNotOpen()
+ return self._datafile_metadata
+
def get_metadata(self):
"""
Provide the metadata for a previously opened object as a dictionary.
@@ -1956,9 +2015,9 @@ class DiskFileManager(BaseDiskFileManager):
if have_valid_fileset() or not accept_first():
# newer .data or .ts already found so discard this
discard()
- # if not have_valid_fileset():
- # # remove any .meta that may have been previously found
- # context['.meta'] = None
+ if not have_valid_fileset():
+ # remove any .meta that may have been previously found
+ context.pop('.meta', None)
set_valid_fileset()
elif ext == '.meta':
if have_valid_fileset() or not accept_first():
@@ -1972,14 +2031,14 @@ class DiskFileManager(BaseDiskFileManager):
def _verify_on_disk_files(self, accepted_files, **kwargs):
"""
Verify that the final combination of on disk files complies with the
- diskfile contract.
+ replicated diskfile contract.
:param accepted_files: files that have been found and accepted
:returns: True if the file combination is compliant, False otherwise
"""
# mimic legacy behavior - .meta is ignored when .ts is found
if accepted_files.get('.ts'):
- accepted_files['.meta'] = None
+ accepted_files.pop('.meta', None)
data_file, meta_file, ts_file, durable_file = tuple(
[accepted_files.get(ext)
@@ -2298,7 +2357,7 @@ class ECDiskFileManager(BaseDiskFileManager):
discard()
if not have_valid_fileset():
# remove any .meta that may have been previously found
- context['.meta'] = None
+ context.pop('.meta', None)
set_valid_fileset()
elif ext in ('.meta', '.durable'):
if have_valid_fileset() or not accept_first():
@@ -2312,7 +2371,7 @@ class ECDiskFileManager(BaseDiskFileManager):
def _verify_on_disk_files(self, accepted_files, frag_index=None, **kwargs):
"""
Verify that the final combination of on disk files complies with the
- diskfile contract.
+ erasure-coded diskfile contract.
:param accepted_files: files that have been found and accepted
:param frag_index: specifies a specific fragment index .data file
diff --git a/swift/obj/mem_diskfile.py b/swift/obj/mem_diskfile.py
index 97d209cd1..277a9f1fa 100644
--- a/swift/obj/mem_diskfile.py
+++ b/swift/obj/mem_diskfile.py
@@ -413,3 +413,11 @@ class DiskFile(object):
fp, md = self._filesystem.get_object(self._name)
if md and md['X-Timestamp'] < Timestamp(timestamp):
self._filesystem.del_object(self._name)
+
+ @property
+ def timestamp(self):
+ if self._metadata is None:
+ raise DiskFileNotOpen()
+ return Timestamp(self._metadata.get('X-Timestamp'))
+
+ data_timestamp = timestamp
diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py
index 719cdcf07..5793b3eda 100644
--- a/swift/obj/reconstructor.py
+++ b/swift/obj/reconstructor.py
@@ -71,24 +71,27 @@ class RebuildingECDiskFileStream(object):
metadata in the DiskFile interface for ssync.
"""
- def __init__(self, metadata, frag_index, rebuilt_fragment_iter):
+ def __init__(self, datafile_metadata, frag_index, rebuilt_fragment_iter):
# start with metadata from a participating FA
- self.metadata = metadata
+ self.datafile_metadata = datafile_metadata
# the new FA is going to have the same length as others in the set
- self._content_length = self.metadata['Content-Length']
+ self._content_length = self.datafile_metadata['Content-Length']
# update the FI and delete the ETag, the obj server will
# recalc on the other side...
- self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
+ self.datafile_metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
for etag_key in ('ETag', 'Etag'):
- self.metadata.pop(etag_key, None)
+ self.datafile_metadata.pop(etag_key, None)
self.frag_index = frag_index
self.rebuilt_fragment_iter = rebuilt_fragment_iter
def get_metadata(self):
- return self.metadata
+ return self.datafile_metadata
+
+ def get_datafile_metadata(self):
+ return self.datafile_metadata
@property
def content_length(self):
@@ -218,7 +221,7 @@ class ObjectReconstructor(Daemon):
'full_path': self._full_path(node, part, path, policy)})
return resp
- def reconstruct_fa(self, job, node, metadata):
+ def reconstruct_fa(self, job, node, datafile_metadata):
"""
Reconstructs a fragment archive - this method is called from ssync
after a remote node responds that is missing this object - the local
@@ -227,7 +230,8 @@ class ObjectReconstructor(Daemon):
:param job: job from ssync_sender
:param node: node that we're rebuilding to
- :param metadata: the metadata to attach to the rebuilt archive
+ :param datafile_metadata: the datafile metadata to attach to
+ the rebuilt fragment archive
:returns: a DiskFile like class for use by ssync
:raises DiskFileError: if the fragment archive cannot be reconstructed
"""
@@ -244,7 +248,7 @@ class ObjectReconstructor(Daemon):
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
pile = GreenAsyncPile(len(part_nodes))
- path = metadata['name']
+ path = datafile_metadata['name']
for node in part_nodes:
pile.spawn(self._get_response, node, job['partition'],
path, headers, job['policy'])
@@ -277,14 +281,14 @@ class ObjectReconstructor(Daemon):
'to reconstruct %s with ETag %s' % (
len(responses), job['policy'].ec_ndata,
self._full_path(node, job['partition'],
- metadata['name'], job['policy']),
+ datafile_metadata['name'], job['policy']),
etag))
raise DiskFileError('Unable to reconstruct EC archive')
rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
responses[:job['policy'].ec_ndata], path, job['policy'],
fi_to_rebuild)
- return RebuildingECDiskFileStream(metadata, fi_to_rebuild,
+ return RebuildingECDiskFileStream(datafile_metadata, fi_to_rebuild,
rebuilt_fragment_iter)
def _reconstruct(self, policy, fragment_payload, frag_index):
@@ -536,17 +540,17 @@ class ObjectReconstructor(Daemon):
:param frag_index: (int) the fragment index of data files to be deleted
"""
df_mgr = self._df_router[job['policy']]
- for object_hash, timestamp in objects.items():
+ for object_hash, timestamps in objects.items():
try:
df = df_mgr.get_diskfile_from_hash(
job['local_dev']['device'], job['partition'],
object_hash, job['policy'],
frag_index=frag_index)
- df.purge(Timestamp(timestamp), frag_index)
+ df.purge(timestamps['ts_data'], frag_index)
except DiskFileError:
self.logger.exception(
'Unable to purge DiskFile (%r %r %r)',
- object_hash, timestamp, frag_index)
+ object_hash, timestamps['ts_data'], frag_index)
continue
def process_job(self, job):
diff --git a/swift/obj/server.py b/swift/obj/server.py
index b5e340177..5bc76edd5 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -487,7 +487,11 @@ class ObjectController(BaseStorageServer):
self._preserve_slo_manifest(metadata, orig_metadata)
metadata.update(val for val in request.headers.items()
if is_user_meta('object', val[0]))
- for header_key in self.allowed_headers:
+ headers_to_copy = (
+ request.headers.get(
+ 'X-Backend-Replication-Headers', '').split() +
+ list(self.allowed_headers))
+ for header_key in headers_to_copy:
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
@@ -549,10 +553,12 @@ class ObjectController(BaseStorageServer):
return HTTPInsufficientStorage(drive=device, request=request)
try:
orig_metadata = disk_file.read_metadata()
+ orig_timestamp = disk_file.data_timestamp
except DiskFileXattrNotSupported:
return HTTPInsufficientStorage(drive=device, request=request)
except (DiskFileNotExist, DiskFileQuarantined):
orig_metadata = {}
+ orig_timestamp = 0
# Checks for If-None-Match
if request.if_none_match is not None and orig_metadata:
@@ -563,7 +569,6 @@ class ObjectController(BaseStorageServer):
# The current ETag matches, so return 412
return HTTPPreconditionFailed(request=request)
- orig_timestamp = Timestamp(orig_metadata.get('X-Timestamp', 0))
if orig_timestamp >= req_timestamp:
return HTTPConflict(
request=request,
@@ -856,7 +861,7 @@ class ObjectController(BaseStorageServer):
orig_metadata = {}
response_class = HTTPNotFound
else:
- orig_timestamp = Timestamp(orig_metadata.get('X-Timestamp', 0))
+ orig_timestamp = disk_file.data_timestamp
if orig_timestamp < req_timestamp:
response_class = HTTPNoContent
else:
diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py
index 233373fe0..7e00e8451 100644
--- a/swift/obj/ssync_receiver.py
+++ b/swift/obj/ssync_receiver.py
@@ -24,6 +24,62 @@ from swift.common import http
from swift.common import swob
from swift.common import utils
from swift.common import request_helpers
+from swift.common.utils import Timestamp
+
+
+def decode_missing(line):
+ """
+ Parse a string of the form generated by
+ :py:func:`~swift.obj.ssync_sender.encode_missing` and return a dict
+ with keys ``object_hash``, ``ts_data``, ``ts_meta``.
+
+ The encoder for this line is
+ :py:func:`~swift.obj.ssync_sender.encode_missing`
+ """
+ result = {}
+ parts = line.split()
+ result['object_hash'], t_data = (urllib.unquote(v) for v in parts[:2])
+ result['ts_data'] = result['ts_meta'] = Timestamp(t_data)
+ if len(parts) > 2:
+ # allow for a comma separated list of k:v pairs to future-proof
+ subparts = urllib.unquote(parts[2]).split(',')
+ for item in [subpart for subpart in subparts if ':' in subpart]:
+ k, v = item.split(':')
+ if k == 'm':
+ result['ts_meta'] = Timestamp(t_data, delta=int(v, 16))
+ return result
+
+
+def encode_wanted(remote, local):
+ """
+ Compare a remote and local results and generate a wanted line.
+
+ :param remote: a dict, with ts_data and ts_meta keys in the form
+ returned by :py:func:`decode_missing`
+ :param local: a dict, possibly empty, with ts_data and ts_meta keys
+ in the form returned :py:meth:`Receiver._check_local`
+
+ The decoder for this line is
+ :py:func:`~swift.obj.ssync_sender.decode_wanted`
+ """
+
+ want = {}
+ if 'ts_data' in local:
+ # we have something, let's get just the right stuff
+ if remote['ts_data'] > local['ts_data']:
+ want['data'] = True
+ if 'ts_meta' in local and remote['ts_meta'] > local['ts_meta']:
+ want['meta'] = True
+ else:
+ # we got nothing, so we'll take whatever the remote has
+ want['data'] = True
+ want['meta'] = True
+ if want:
+ # this is the inverse of _decode_wanted's key_map
+ key_map = dict(data='d', meta='m')
+ parts = ''.join(v for k, v in sorted(key_map.items()) if want.get(k))
+ return '%s %s' % (urllib.quote(remote['object_hash']), parts)
+ return None
class Receiver(object):
@@ -185,6 +241,42 @@ class Receiver(object):
raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input']
+ def _check_local(self, object_hash):
+ """
+ Parse local diskfile and return results of current
+ representative for comparison to remote.
+
+ :param object_hash: the hash of the remote object being offered
+ """
+ try:
+ df = self.diskfile_mgr.get_diskfile_from_hash(
+ self.device, self.partition, object_hash,
+ self.policy, frag_index=self.frag_index)
+ except exceptions.DiskFileNotExist:
+ return {}
+ try:
+ df.open()
+ except exceptions.DiskFileDeleted as err:
+ return {'ts_data': err.timestamp}
+ except exceptions.DiskFileError as err:
+ return {}
+ return {
+ 'ts_data': df.data_timestamp,
+ 'ts_meta': df.timestamp,
+ }
+
+ def _check_missing(self, line):
+ """
+ Parse offered object from sender, and compare to local diskfile,
+ responding with proper protocol line to represented needed data
+ or None if in sync.
+
+ Anchor point for tests to mock legacy protocol changes.
+ """
+ remote = decode_missing(line)
+ local = self._check_local(remote['object_hash'])
+ return encode_wanted(remote, local)
+
def missing_check(self):
"""
Handles the receiver-side of the MISSING_CHECK step of a
@@ -208,8 +300,14 @@ class Receiver(object):
4. Receiver gets `:MISSING_CHECK: END`, responds with
`:MISSING_CHECK: START`, followed by the list of
- hashes it collected as being wanted (one per line),
- `:MISSING_CHECK: END`, and flushes any buffers.
+ <wanted_hash> specifiers it collected as being wanted
+ (one per line), `:MISSING_CHECK: END`, and flushes any
+ buffers.
+
+ Each <wanted_hash> specifier has the form <hash>[ <parts>] where
+ <parts> is a string containing characters 'd' and/or 'm'
+ indicating that only data or meta part of object respectively is
+ required to be sync'd.
5. Sender gets `:MISSING_CHECK: START` and reads the list
of hashes desired by the receiver until reading
@@ -232,26 +330,9 @@ class Receiver(object):
line = self.fp.readline(self.app.network_chunk_size)
if not line or line.strip() == ':MISSING_CHECK: END':
break
- parts = line.split()
- object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]]
- want = False
- try:
- df = self.diskfile_mgr.get_diskfile_from_hash(
- self.device, self.partition, object_hash, self.policy,
- frag_index=self.frag_index)
- except exceptions.DiskFileNotExist:
- want = True
- else:
- try:
- df.open()
- except exceptions.DiskFileDeleted as err:
- want = err.timestamp < timestamp
- except exceptions.DiskFileError as err:
- want = True
- else:
- want = df.timestamp < timestamp
+ want = self._check_missing(line)
if want:
- object_hashes.append(object_hash)
+ object_hashes.append(want)
yield ':MISSING_CHECK: START\r\n'
if object_hashes:
yield '\r\n'.join(object_hashes)
@@ -338,10 +419,11 @@ class Receiver(object):
if header == 'content-length':
content_length = int(value)
# Establish subrequest body, if needed.
- if method == 'DELETE':
+ if method in ('DELETE', 'POST'):
if content_length not in (None, 0):
raise Exception(
- 'DELETE subrequest with content-length %s' % path)
+ '%s subrequest with content-length %s'
+ % (method, path))
elif method == 'PUT':
if content_length is None:
raise Exception(
diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py
index 83030782d..82a1ae4b1 100644
--- a/swift/obj/ssync_sender.py
+++ b/swift/obj/ssync_sender.py
@@ -20,6 +20,48 @@ from swift.common import exceptions
from swift.common import http
+def encode_missing(object_hash, ts_data, ts_meta=None):
+ """
+ Returns a string representing the object hash, its data file timestamp
+ and the delta forwards to its metafile timestamp, if non-zero, in the form:
+ ``<hash> <timestamp> m:<hex delta>``
+
+ The decoder for this line is
+ :py:func:`~swift.obj.ssync_receiver.decode_missing`
+ """
+ msg = '%s %s' % (urllib.quote(object_hash), urllib.quote(ts_data.internal))
+ if ts_meta and ts_meta != ts_data:
+ delta = ts_meta.raw - ts_data.raw
+ msg = '%s m:%x' % (msg, delta)
+ return msg
+
+
+def decode_wanted(parts):
+ """
+ Parse missing_check line parts to determine which parts of local
+ diskfile were wanted by the receiver.
+
+ The encoder for parts is
+ :py:func:`~swift.obj.ssync_receiver.encode_wanted`
+ """
+ wanted = {}
+ key_map = dict(d='data', m='meta')
+ if parts:
+ # receiver specified data and/or meta wanted, so use those as
+ # conditions for sending PUT and/or POST subrequests
+ for k in key_map:
+ if k in parts[0]:
+ wanted[key_map[k]] = True
+ if not wanted:
+ # assume legacy receiver which will only accept PUTs. There is no
+ # way to send any meta file content without morphing the timestamp
+ # of either the data or the metadata, so we just send data file
+ # content to a legacy receiver. Once the receiver gets updated we
+ # will be able to send it the meta file content.
+ wanted['data'] = True
+ return wanted
+
+
class Sender(object):
"""
Sends SSYNC requests to the object server.
@@ -40,14 +82,15 @@ class Sender(object):
self.response_buffer = ''
self.response_chunk_left = 0
# available_map has an entry for each object in given suffixes that
- # is available to be sync'd; each entry is a hash => timestamp
+ # is available to be sync'd; each entry is a hash => dict of timestamps
+ # of data file or tombstone file and/or meta file
self.available_map = {}
# When remote_check_objs is given in job, ssync_sender trys only to
# make sure those objects exist or not in remote.
self.remote_check_objs = remote_check_objs
- # send_list has an entry for each object that the receiver wants to
- # be sync'ed; each entry is an object hash
- self.send_list = []
+ # send_map has an entry for each object that the receiver wants to
+ # be sync'ed; each entry maps an object hash => dict of wanted parts
+ self.send_map = {}
self.failures = 0
def __call__(self):
@@ -57,7 +100,8 @@ class Sender(object):
:returns: a 2-tuple, in the form (success, can_delete_objs) where
success is a boolean and can_delete_objs is the map of
objects that are in sync with the receiver. Each entry in
- can_delete_objs maps a hash => timestamp
+ can_delete_objs maps a hash => timestamp of data file or
+ tombstone file
"""
if not self.suffixes:
return True, {}
@@ -79,7 +123,7 @@ class Sender(object):
# *send* any requested updates; instead we only collect
# what's already in sync and safe for deletion
in_sync_hashes = (set(self.available_map.keys()) -
- set(self.send_list))
+ set(self.send_map.keys()))
can_delete_obj = dict((hash_, self.available_map[hash_])
for hash_ in in_sync_hashes)
if not self.failures:
@@ -220,17 +264,15 @@ class Sender(object):
frag_index=self.job.get('frag_index'))
if self.remote_check_objs is not None:
hash_gen = ifilter(
- lambda path_objhash_timestamp:
- path_objhash_timestamp[1] in
+ lambda path_objhash_timestamps:
+ path_objhash_timestamps[1] in
self.remote_check_objs, hash_gen)
- for path, object_hash, timestamp in hash_gen:
- self.available_map[object_hash] = timestamp
+ for path, object_hash, timestamps in hash_gen:
+ self.available_map[object_hash] = timestamps
with exceptions.MessageTimeout(
self.daemon.node_timeout,
'missing_check send line'):
- msg = '%s %s\r\n' % (
- urllib.quote(object_hash),
- urllib.quote(timestamp))
+ msg = '%s\r\n' % encode_missing(object_hash, **timestamps)
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'missing_check end'):
@@ -260,7 +302,7 @@ class Sender(object):
break
parts = line.split()
if parts:
- self.send_list.append(parts[0])
+ self.send_map[parts[0]] = decode_wanted(parts[1:])
def updates(self):
"""
@@ -270,12 +312,13 @@ class Sender(object):
Full documentation of this can be found at
:py:meth:`.Receiver.updates`.
"""
- # First, send all our subrequests based on the send_list.
+ # First, send all our subrequests based on the send_map.
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'updates start'):
msg = ':UPDATES: START\r\n'
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
- for object_hash in self.send_list:
+ for object_hash, want in self.send_map.items():
+ object_hash = urllib.unquote(object_hash)
try:
df = self.df_mgr.get_diskfile_from_hash(
self.job['device'], self.job['partition'], object_hash,
@@ -286,16 +329,21 @@ class Sender(object):
'/%s/%s/%s' % (df.account, df.container, df.obj))
try:
df.open()
- # EC reconstructor may have passed a callback to build
- # an alternative diskfile...
- df = self.job.get('sync_diskfile_builder', lambda *args: df)(
- self.job, self.node, df.get_metadata())
+ if want.get('data'):
+ # EC reconstructor may have passed a callback to build an
+ # alternative diskfile - construct it using the metadata
+ # from the data file only.
+ df_alt = self.job.get(
+ 'sync_diskfile_builder', lambda *args: df)(
+ self.job, self.node, df.get_datafile_metadata())
+ self.send_put(url_path, df_alt)
+ if want.get('meta') and df.data_timestamp != df.timestamp:
+ self.send_post(url_path, df)
except exceptions.DiskFileDeleted as err:
- self.send_delete(url_path, err.timestamp)
+ if want.get('data'):
+ self.send_delete(url_path, err.timestamp)
except exceptions.DiskFileError:
pass
- else:
- self.send_put(url_path, df)
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'updates end'):
msg = ':UPDATES: END\r\n'
@@ -343,7 +391,7 @@ class Sender(object):
"""
msg = ['PUT ' + url_path, 'Content-Length: ' + str(df.content_length)]
# Sorted to make it easier to test.
- for key, value in sorted(df.get_metadata().items()):
+ for key, value in sorted(df.get_datafile_metadata().items()):
if key not in ('name', 'Content-Length'):
msg.append('%s: %s' % (key, value))
msg = '\r\n'.join(msg) + '\r\n\r\n'
@@ -354,6 +402,19 @@ class Sender(object):
self.daemon.node_timeout, 'send_put chunk'):
self.connection.send('%x\r\n%s\r\n' % (len(chunk), chunk))
+ def send_post(self, url_path, df):
+ metadata = df.get_metafile_metadata()
+ if metadata is None:
+ return
+
+ msg = ['POST ' + url_path]
+ # Sorted to make it easier to test.
+ for key, value in sorted(metadata.items()):
+ msg.append('%s: %s' % (key, value))
+ msg = '\r\n'.join(msg) + '\r\n\r\n'
+ with exceptions.MessageTimeout(self.daemon.node_timeout, 'send_post'):
+ self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
+
def disconnect(self):
"""
Closes down the connection to the object server once done
diff --git a/test/probe/test_object_metadata_replication.py b/test/probe/test_object_metadata_replication.py
index c278e5f81..e7479d5fa 100644
--- a/test/probe/test_object_metadata_replication.py
+++ b/test/probe/test_object_metadata_replication.py
@@ -16,54 +16,24 @@
from io import StringIO
from tempfile import mkdtemp
from textwrap import dedent
-import functools
import unittest
import os
import shutil
import uuid
+from swift.common.exceptions import DiskFileDeleted
+from swift.container.backend import ContainerBroker
from swift.common import internal_client, utils
+from swift.common.ring import Ring
+from swift.common.utils import Timestamp, get_logger
+from swift.obj.diskfile import DiskFileManager
+from swift.common.storage_policy import POLICIES
from test.probe.brain import BrainSplitter
from test.probe.common import ReplProbeTest
-def _sync_methods(object_server_config_paths):
- """
- Get the set of all configured sync_methods for the object-replicator
- sections in the list of config paths.
- """
- sync_methods = set()
- for config_path in object_server_config_paths:
- options = utils.readconf(config_path, 'object-replicator')
- sync_methods.add(options.get('sync_method', 'rsync'))
- return sync_methods
-
-
-def expected_failure_with_ssync(m):
- """
- Wrapper for probetests that don't pass if you use ssync
- """
- @functools.wraps(m)
- def wrapper(self, *args, **kwargs):
- obj_conf = self.configs['object-server']
- config_paths = [v for k, v in obj_conf.items()
- if k in self.brain.handoff_numbers]
- using_ssync = 'ssync' in _sync_methods(config_paths)
- failed = False
- try:
- return m(self, *args, **kwargs)
- except AssertionError:
- failed = True
- if not using_ssync:
- raise
- finally:
- if using_ssync and not failed:
- self.fail('This test is expected to fail with ssync')
- return wrapper
-
-
class Test(ReplProbeTest):
def setUp(self):
"""
@@ -102,9 +72,97 @@ class Test(ReplProbeTest):
super(Test, self).tearDown()
shutil.rmtree(self.tempdir)
- def _put_object(self, headers=None):
+ def _get_object_info(self, account, container, obj, number,
+ policy=None):
+ obj_conf = self.configs['object-server']
+ config_path = obj_conf[number]
+ options = utils.readconf(config_path, 'app:object-server')
+ swift_dir = options.get('swift_dir', '/etc/swift')
+ ring = POLICIES.get_object_ring(policy, swift_dir)
+ part, nodes = ring.get_nodes(account, container, obj)
+ for node in nodes:
+ # assumes one to one mapping
+ if node['port'] == int(options.get('bind_port')):
+ device = node['device']
+ break
+ else:
+ return None
+ mgr = DiskFileManager(options, get_logger(options))
+ disk_file = mgr.get_diskfile(device, part, account, container, obj,
+ policy)
+ info = disk_file.read_metadata()
+ return info
+
+ def _assert_consistent_object_metadata(self):
+ obj_info = []
+ for i in range(1, 5):
+ info_i = self._get_object_info(self.account, self.container_name,
+ self.object_name, i)
+ if info_i:
+ obj_info.append(info_i)
+ self.assertTrue(len(obj_info) > 1)
+ for other in obj_info[1:]:
+ self.assertEqual(obj_info[0], other,
+ 'Object metadata mismatch: %s != %s'
+ % (obj_info[0], other))
+
+ def _assert_consistent_deleted_object(self):
+ for i in range(1, 5):
+ try:
+ info = self._get_object_info(self.account, self.container_name,
+ self.object_name, i)
+ if info is not None:
+ self.fail('Expected no disk file info but found %s' % info)
+ except DiskFileDeleted:
+ pass
+
+ def _get_db_info(self, account, container, number):
+ server_type = 'container'
+ obj_conf = self.configs['%s-server' % server_type]
+ config_path = obj_conf[number]
+ options = utils.readconf(config_path, 'app:container-server')
+ root = options.get('devices')
+
+ swift_dir = options.get('swift_dir', '/etc/swift')
+ ring = Ring(swift_dir, ring_name=server_type)
+ part, nodes = ring.get_nodes(account, container)
+ for node in nodes:
+ # assumes one to one mapping
+ if node['port'] == int(options.get('bind_port')):
+ device = node['device']
+ break
+ else:
+ return None
+
+ path_hash = utils.hash_path(account, container)
+ _dir = utils.storage_directory('%ss' % server_type, part, path_hash)
+ db_dir = os.path.join(root, device, _dir)
+ db_file = os.path.join(db_dir, '%s.db' % path_hash)
+ db = ContainerBroker(db_file)
+ return db.get_info()
+
+ def _assert_consistent_container_dbs(self):
+ db_info = []
+ for i in range(1, 5):
+ info_i = self._get_db_info(self.account, self.container_name, i)
+ if info_i:
+ db_info.append(info_i)
+ self.assertTrue(len(db_info) > 1)
+ for other in db_info[1:]:
+ self.assertEqual(db_info[0]['hash'], other['hash'],
+ 'Container db hash mismatch: %s != %s'
+ % (db_info[0]['hash'], other['hash']))
+
+ def _assert_object_metadata_matches_listing(self, listing, metadata):
+ self.assertEqual(listing['bytes'], int(metadata['content-length']))
+ self.assertEqual(listing['hash'], metadata['etag'])
+ self.assertEqual(listing['content_type'], metadata['content-type'])
+ modified = Timestamp(metadata['x-timestamp']).isoformat
+ self.assertEqual(listing['last_modified'], modified)
+
+ def _put_object(self, headers=None, body=u'stuff'):
headers = headers or {}
- self.int_client.upload_object(StringIO(u'stuff'), self.account,
+ self.int_client.upload_object(StringIO(body), self.account,
self.container_name,
self.object_name, headers)
@@ -171,7 +229,96 @@ class Test(ReplProbeTest):
self.brain.stop_handoff_half()
self._get_object()
- @expected_failure_with_ssync
+ def test_object_after_replication_with_subsequent_post(self):
+ self.brain.put_container(policy_index=0)
+
+ # put object
+ self._put_object(headers={'Content-Type': 'foo'}, body=u'older')
+
+ # put newer object to first server subset
+ self.brain.stop_primary_half()
+ self._put_object(headers={'Content-Type': 'bar'}, body=u'newer')
+ metadata = self._get_object_metadata()
+ etag = metadata['etag']
+ self.brain.start_primary_half()
+
+ # post some user meta to all servers
+ self._post_object({'x-object-meta-bar': 'meta-bar'})
+
+ # run replicator
+ self.get_to_final_state()
+
+ # check that newer data has been replicated to second server subset
+ self.brain.stop_handoff_half()
+ metadata = self._get_object_metadata()
+ self.assertEqual(etag, metadata['etag'])
+ self.assertEqual('bar', metadata['content-type'])
+ self.assertEqual('meta-bar', metadata['x-object-meta-bar'])
+ self.brain.start_handoff_half()
+
+ self._assert_consistent_object_metadata()
+ self._assert_consistent_container_dbs()
+
+ def test_sysmeta_after_replication_with_subsequent_put(self):
+ sysmeta = {'x-object-sysmeta-foo': 'older'}
+ sysmeta2 = {'x-object-sysmeta-foo': 'newer'}
+ usermeta = {'x-object-meta-bar': 'meta-bar'}
+ self.brain.put_container(policy_index=0)
+
+ # put object with sysmeta to first server subset
+ self.brain.stop_primary_half()
+ self._put_object(headers=sysmeta)
+ metadata = self._get_object_metadata()
+ for key in sysmeta:
+ self.assertTrue(key in metadata)
+ self.assertEqual(metadata[key], sysmeta[key])
+ self.brain.start_primary_half()
+
+ # put object with updated sysmeta to second server subset
+ self.brain.stop_handoff_half()
+ self._put_object(headers=sysmeta2)
+ metadata = self._get_object_metadata()
+ for key in sysmeta2:
+ self.assertTrue(key in metadata)
+ self.assertEqual(metadata[key], sysmeta2[key])
+ self._post_object(usermeta)
+ metadata = self._get_object_metadata()
+ for key in usermeta:
+ self.assertTrue(key in metadata)
+ self.assertEqual(metadata[key], usermeta[key])
+ for key in sysmeta2:
+ self.assertTrue(key in metadata)
+ self.assertEqual(metadata[key], sysmeta2[key])
+
+ self.brain.start_handoff_half()
+
+ # run replicator
+ self.get_to_final_state()
+
+ # check sysmeta has been replicated to first server subset
+ self.brain.stop_primary_half()
+ metadata = self._get_object_metadata()
+ for key in usermeta:
+ self.assertTrue(key in metadata)
+ self.assertEqual(metadata[key], usermeta[key])
+ for key in sysmeta2.keys():
+ self.assertTrue(key in metadata, key)
+ self.assertEqual(metadata[key], sysmeta2[key])
+ self.brain.start_primary_half()
+
+ # check user sysmeta ok on second server subset
+ self.brain.stop_handoff_half()
+ metadata = self._get_object_metadata()
+ for key in usermeta:
+ self.assertTrue(key in metadata)
+ self.assertEqual(metadata[key], usermeta[key])
+ for key in sysmeta2.keys():
+ self.assertTrue(key in metadata, key)
+ self.assertEqual(metadata[key], sysmeta2[key])
+
+ self._assert_consistent_object_metadata()
+ self._assert_consistent_container_dbs()
+
def test_sysmeta_after_replication_with_subsequent_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
usermeta = {'x-object-meta-bar': 'meta-bar'}
@@ -218,6 +365,8 @@ class Test(ReplProbeTest):
for key in expected.keys():
self.assertTrue(key in metadata, key)
self.assertEqual(metadata[key], expected[key])
+ self._assert_consistent_object_metadata()
+ self._assert_consistent_container_dbs()
def test_sysmeta_after_replication_with_prior_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
@@ -267,6 +416,8 @@ class Test(ReplProbeTest):
self.assertEqual(metadata[key], sysmeta[key])
for key in usermeta:
self.assertFalse(key in metadata)
+ self._assert_consistent_object_metadata()
+ self._assert_consistent_container_dbs()
if __name__ == "__main__":
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index d1fdb1f41..3683f288e 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -405,6 +405,44 @@ class TestTimestamp(unittest.TestCase):
'%r is not bigger than %f given %r' % (
timestamp, float(normal), value))
+ def test_raw(self):
+ expected = 140243640891203
+ timestamp = utils.Timestamp(1402436408.91203)
+ self.assertEqual(expected, timestamp.raw)
+
+ # 'raw' does not include offset
+ timestamp = utils.Timestamp(1402436408.91203, 0xf0)
+ self.assertEqual(expected, timestamp.raw)
+
+ def test_delta(self):
+ def _assertWithinBounds(expected, timestamp):
+ tolerance = 0.00001
+ minimum = expected - tolerance
+ maximum = expected + tolerance
+ self.assertTrue(float(timestamp) > minimum)
+ self.assertTrue(float(timestamp) < maximum)
+
+ timestamp = utils.Timestamp(1402436408.91203, delta=100)
+ _assertWithinBounds(1402436408.91303, timestamp)
+ self.assertEqual(140243640891303, timestamp.raw)
+
+ timestamp = utils.Timestamp(1402436408.91203, delta=-100)
+ _assertWithinBounds(1402436408.91103, timestamp)
+ self.assertEqual(140243640891103, timestamp.raw)
+
+ timestamp = utils.Timestamp(1402436408.91203, delta=0)
+ _assertWithinBounds(1402436408.91203, timestamp)
+ self.assertEqual(140243640891203, timestamp.raw)
+
+ # delta is independent of offset
+ timestamp = utils.Timestamp(1402436408.91203, offset=42, delta=100)
+ self.assertEqual(140243640891303, timestamp.raw)
+ self.assertEqual(42, timestamp.offset)
+
+ # cannot go negative
+ self.assertRaises(ValueError, utils.Timestamp, 1402436408.91203,
+ delta=-140243640891203)
+
def test_int(self):
expected = 1402437965
test_values = (
diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py
index 28c437f98..5c88dff9c 100644
--- a/test/unit/obj/test_diskfile.py
+++ b/test/unit/obj/test_diskfile.py
@@ -33,12 +33,13 @@ from shutil import rmtree
from time import time
from tempfile import mkdtemp
from hashlib import md5
-from contextlib import closing, nested
+from contextlib import closing, nested, contextmanager
from gzip import GzipFile
from eventlet import hubs, timeout, tpool
from test.unit import (FakeLogger, mock as unit_mock, temptree,
- patch_policies, debug_logger, EMPTY_ETAG)
+ patch_policies, debug_logger, EMPTY_ETAG,
+ make_timestamp_iter)
from nose import SkipTest
from swift.obj import diskfile
@@ -921,9 +922,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
return files
self.fail('Unexpected listdir of %r' % path)
expected_items = [
- (os.path.join(part_path, hash_[-3:], hash_), hash_,
- Timestamp(ts).internal)
- for hash_, ts in expected.items()]
+ (os.path.join(part_path, hash_[-3:], hash_), hash_, timestamps)
+ for hash_, timestamps in expected.items()]
with nested(
mock.patch('os.listdir', _listdir),
mock.patch('os.unlink')):
@@ -932,8 +932,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
device, part, policy, **kwargs))
expected = sorted(expected_items)
actual = sorted(hash_items)
- self.assertEqual(actual, expected,
- 'Expected %s but got %s' % (expected, actual))
+ # default list diff easiest to debug
+ self.assertEqual(actual, expected)
def test_yield_hashes_tombstones(self):
ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
@@ -965,9 +965,9 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
}
}
expected = {
- '1111111111111111111111111111127e': ts1.internal,
- '2222222222222222222222222222227e': ts2.internal,
- '3333333333333333333333333333300b': ts3.internal,
+ '1111111111111111111111111111127e': {'ts_data': ts1.internal},
+ '2222222222222222222222222222227e': {'ts_data': ts2.internal},
+ '3333333333333333333333333333300b': {'ts_data': ts3.internal},
}
for policy in POLICIES:
self._check_yield_hashes(policy, suffix_map, expected,
@@ -1084,9 +1084,9 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {},
}
expected = {
- '9373a92d072897b136b3fc06595b4abc': fresh_ts,
- '9373a92d072897b136b3fc06595b0456': old_ts,
- '9373a92d072897b136b3fc06595b7456': fresher_ts,
+ '9373a92d072897b136b3fc06595b4abc': {'ts_data': fresh_ts},
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
+ '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected)
@@ -1097,24 +1097,30 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
ts3 = next(ts_iter)
suffix_map = {
'abc': {
- '9373a92d072897b136b3fc06595b4abc': [
+ # only tombstone is yield/sync -able
+ '9333a92d072897b136b3fc06595b4abc': [
ts1.internal + '.ts',
ts2.internal + '.meta'],
},
'456': {
- '9373a92d072897b136b3fc06595b0456': [
+ # only latest metadata timestamp
+ '9444a92d072897b136b3fc06595b0456': [
ts1.internal + '.data',
ts2.internal + '.meta',
ts3.internal + '.meta'],
- '9373a92d072897b136b3fc06595b7456': [
+ # exemplary datadir with .meta
+ '9555a92d072897b136b3fc06595b7456': [
ts1.internal + '.data',
ts2.internal + '.meta'],
},
}
expected = {
- '9373a92d072897b136b3fc06595b4abc': ts2,
- '9373a92d072897b136b3fc06595b0456': ts3,
- '9373a92d072897b136b3fc06595b7456': ts2,
+ '9333a92d072897b136b3fc06595b4abc':
+ {'ts_data': ts1},
+ '9444a92d072897b136b3fc06595b0456':
+ {'ts_data': ts1, 'ts_meta': ts3},
+ '9555a92d072897b136b3fc06595b7456':
+ {'ts_data': ts1, 'ts_meta': ts2},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected)
@@ -1138,8 +1144,8 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {},
}
expected = {
- '9373a92d072897b136b3fc06595b0456': old_ts,
- '9373a92d072897b136b3fc06595b7456': fresher_ts,
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
+ '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
suffixes=['456'])
@@ -1156,7 +1162,7 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
- '9373a92d072897b136b3fc06595b0456': ts1,
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
}
try:
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
@@ -1369,6 +1375,27 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
def test_get_ondisk_files_with_stray_meta(self):
# get_ondisk_files does not tolerate a stray .meta file
+ class_under_test = self._get_diskfile(POLICIES.default)
+
+ @contextmanager
+ def create_files(df, files):
+ os.makedirs(df._datadir)
+ for fname in files:
+ fpath = os.path.join(df._datadir, fname)
+ with open(fpath, 'w') as f:
+ diskfile.write_metadata(f, {'name': df._name,
+ 'Content-Length': 0})
+ yield
+ rmtree(df._datadir, ignore_errors=True)
+
+ # sanity
+ files = [
+ '0000000006.00000#1.data',
+ '0000000006.00000.durable',
+ ]
+ with create_files(class_under_test, files):
+ class_under_test.open()
+
scenarios = [['0000000007.00000.meta'],
['0000000007.00000.meta',
@@ -1382,8 +1409,13 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'0000000005.00000#1.data']
]
for files in scenarios:
- class_under_test = self._get_diskfile(POLICIES.default)
- self.assertRaises(DiskFileNotExist, class_under_test.open)
+ with create_files(class_under_test, files):
+ try:
+ class_under_test.open()
+ except DiskFileNotExist:
+ continue
+ self.fail('expected DiskFileNotExist opening %s with %r' % (
+ class_under_test.__class__.__name__, files))
def test_parse_on_disk_filename(self):
mgr = self.df_router[POLICIES.default]
@@ -1550,9 +1582,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {},
}
expected = {
- '9373a92d072897b136b3fc06595b4abc': fresh_ts,
- '9373a92d072897b136b3fc06595b0456': old_ts,
- '9373a92d072897b136b3fc06595b7456': fresher_ts,
+ '9373a92d072897b136b3fc06595b4abc': {'ts_data': fresh_ts},
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
+ '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@@ -1581,22 +1613,18 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
- # TODO: differs from repl DiskFileManager which *will*
- # return meta timestamp when only meta and ts on disk
- '9373a92d072897b136b3fc06595b4abc': ts1,
- '9373a92d072897b136b3fc06595b0456': ts3,
- '9373a92d072897b136b3fc06595b7456': ts2,
+ '9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1},
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1,
+ 'ts_meta': ts3},
+ '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1,
+ 'ts_meta': ts2},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected)
- # but meta timestamp is not returned if specified frag index
+ # but meta timestamp is *not* returned if specified frag index
# is not found
expected = {
- # TODO: differs from repl DiskFileManager which *will*
- # return meta timestamp when only meta and ts on disk
- '9373a92d072897b136b3fc06595b4abc': ts1,
- '9373a92d072897b136b3fc06595b0456': ts3,
- '9373a92d072897b136b3fc06595b7456': ts2,
+ '9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=3)
@@ -1623,8 +1651,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'def': {},
}
expected = {
- '9373a92d072897b136b3fc06595b0456': old_ts,
- '9373a92d072897b136b3fc06595b7456': fresher_ts,
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts},
+ '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
suffixes=['456'], frag_index=2)
@@ -1642,7 +1670,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
- '9373a92d072897b136b3fc06595b0456': ts1,
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@@ -1651,8 +1679,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
suffix_map['456']['9373a92d072897b136b3fc06595b7456'].append(
ts1.internal + '.durable')
expected = {
- '9373a92d072897b136b3fc06595b0456': ts1,
- '9373a92d072897b136b3fc06595b7456': ts1,
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
+ '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@@ -1672,7 +1700,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
- '9373a92d072897b136b3fc06595b0456': ts1,
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=None)
@@ -1683,7 +1711,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
suffix_map['456']['9373a92d072897b136b3fc06595b0456'].append(
ts2.internal + '.durable')
expected = {
- '9373a92d072897b136b3fc06595b0456': ts2,
+ '9373a92d072897b136b3fc06595b0456': {'ts_data': ts2},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=None)
@@ -1698,27 +1726,40 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
ts2 = next(ts_iter)
suffix_map = {
'456': {
- '9373a92d072897b136b3fc06595b0456': [
+ # this one is fine
+ '9333a92d072897b136b3fc06595b0456': [
ts1.internal + '#2.data',
ts1.internal + '.durable'],
- '9373a92d072897b136b3fc06595b7456': [
+ # missing frag index
+ '9444a92d072897b136b3fc06595b7456': [
ts1.internal + '.data'],
- '9373a92d072897b136b3fc06595b8456': [
+ '9555a92d072897b136b3fc06595b8456': [
'junk_file'],
- '9373a92d072897b136b3fc06595b9456': [
- ts1.internal + '.data',
+ # missing .durable
+ '9666a92d072897b136b3fc06595b9456': [
+ ts1.internal + '#2.data',
ts2.internal + '.meta'],
- '9373a92d072897b136b3fc06595ba456': [
+ # .meta files w/o .data files can't be opened, and are ignored
+ '9777a92d072897b136b3fc06595ba456': [
ts1.internal + '.meta'],
- '9373a92d072897b136b3fc06595bb456': [
+ '9888a92d072897b136b3fc06595bb456': [
ts1.internal + '.meta',
ts2.internal + '.meta'],
+ # this is good with meta
+ '9999a92d072897b136b3fc06595bb456': [
+ ts1.internal + '#2.data',
+ ts1.internal + '.durable',
+ ts2.internal + '.meta'],
+ # this one is wrong frag index
+ '9aaaa92d072897b136b3fc06595b0456': [
+ ts1.internal + '#7.data',
+ ts1.internal + '.durable'],
},
}
expected = {
- '9373a92d072897b136b3fc06595b0456': ts1,
- '9373a92d072897b136b3fc06595ba456': ts1,
- '9373a92d072897b136b3fc06595bb456': ts2,
+ '9333a92d072897b136b3fc06595b0456': {'ts_data': ts1},
+ '9999a92d072897b136b3fc06595bb456': {'ts_data': ts1,
+ 'ts_meta': ts2},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@@ -1758,9 +1799,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
},
}
expected = {
- '1111111111111111111111111111127e': ts1,
- '2222222222222222222222222222227e': ts2,
- '3333333333333333333333333333300b': ts3,
+ '1111111111111111111111111111127e': {'ts_data': ts1},
+ '2222222222222222222222222222227e': {'ts_data': ts2},
+ '3333333333333333333333333333300b': {'ts_data': ts3},
}
self._check_yield_hashes(POLICIES.default, suffix_map, expected,
frag_index=2)
@@ -1976,17 +2017,62 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def test_get_metadata_not_opened(self):
df = self._simple_get_diskfile()
- self.assertRaises(DiskFileNotOpen, df.get_metadata)
+ with self.assertRaises(DiskFileNotOpen):
+ df.get_metadata()
+
+ def test_get_datafile_metadata(self):
+ ts_iter = make_timestamp_iter()
+ body = '1234567890'
+ ts_data = ts_iter.next()
+ metadata = {'X-Object-Meta-Test': 'test1',
+ 'X-Object-Sysmeta-Test': 'test1'}
+ df = self._create_test_file(body, timestamp=ts_data.internal,
+ metadata=metadata)
+ expected = df.get_metadata()
+ ts_meta = ts_iter.next()
+ df.write_metadata({'X-Timestamp': ts_meta.internal,
+ 'X-Object-Meta-Test': 'changed',
+ 'X-Object-Sysmeta-Test': 'ignored'})
+ df.open()
+ self.assertEqual(expected, df.get_datafile_metadata())
+ expected.update({'X-Timestamp': ts_meta.internal,
+ 'X-Object-Meta-Test': 'changed'})
+ self.assertEqual(expected, df.get_metadata())
+
+ def test_get_datafile_metadata_not_opened(self):
+ df = self._simple_get_diskfile()
+ with self.assertRaises(DiskFileNotOpen):
+ df.get_datafile_metadata()
+
+ def test_get_metafile_metadata(self):
+ ts_iter = make_timestamp_iter()
+ body = '1234567890'
+ ts_data = ts_iter.next()
+ metadata = {'X-Object-Meta-Test': 'test1',
+ 'X-Object-Sysmeta-Test': 'test1'}
+ df = self._create_test_file(body, timestamp=ts_data.internal,
+ metadata=metadata)
+ self.assertIsNone(df.get_metafile_metadata())
+
+ # now create a meta file
+ ts_meta = ts_iter.next()
+ df.write_metadata({'X-Timestamp': ts_meta.internal,
+ 'X-Object-Meta-Test': 'changed'})
+ df.open()
+ expected = {'X-Timestamp': ts_meta.internal,
+ 'X-Object-Meta-Test': 'changed'}
+ self.assertEqual(expected, df.get_metafile_metadata())
+
+ def test_get_metafile_metadata_not_opened(self):
+ df = self._simple_get_diskfile()
+ with self.assertRaises(DiskFileNotOpen):
+ df.get_metafile_metadata()
def test_not_opened(self):
df = self._simple_get_diskfile()
- try:
+ with self.assertRaises(DiskFileNotOpen):
with df:
pass
- except DiskFileNotOpen:
- pass
- else:
- self.fail("Expected DiskFileNotOpen exception")
def test_disk_file_default_disallowed_metadata(self):
# build an object with some meta (at t0+1s)
@@ -3078,11 +3164,31 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self.assertEqual(str(exc), '')
def test_diskfile_timestamp(self):
- ts = Timestamp(time())
- self._get_open_disk_file(ts=ts.internal)
+ ts_1 = self.ts()
+ self._get_open_disk_file(ts=ts_1.internal)
+ df = self._simple_get_diskfile()
+ with df.open():
+ self.assertEqual(df.timestamp, ts_1.internal)
+ ts_2 = self.ts()
+ df.write_metadata({'X-Timestamp': ts_2.internal})
+ with df.open():
+ self.assertEqual(df.timestamp, ts_2.internal)
+
+ def test_data_timestamp(self):
+ ts_1 = self.ts()
+ self._get_open_disk_file(ts=ts_1.internal)
df = self._simple_get_diskfile()
with df.open():
- self.assertEqual(df.timestamp, ts.internal)
+ self.assertEqual(df.data_timestamp, ts_1.internal)
+ ts_2 = self.ts()
+ df.write_metadata({'X-Timestamp': ts_2.internal})
+ with df.open():
+ self.assertEqual(df.data_timestamp, ts_1.internal)
+
+ def test_data_timestamp_not_open(self):
+ df = self._simple_get_diskfile()
+ with self.assertRaises(DiskFileNotOpen):
+ df.data_timestamp
def test_error_in_hash_cleanup_listdir(self):
diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py
index 16aaa0f60..c19e8b812 100755
--- a/test/unit/obj/test_reconstructor.py
+++ b/test/unit/obj/test_reconstructor.py
@@ -840,8 +840,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.job['policy'], self.suffixes,
frag_index=self.job.get('frag_index'))
self.available_map = {}
- for path, hash_, ts in hash_gen:
- self.available_map[hash_] = ts
+ for path, hash_, timestamps in hash_gen:
+ self.available_map[hash_] = timestamps
context['available_map'] = self.available_map
ssync_calls.append(context)
@@ -2403,7 +2403,7 @@ class TestObjectReconstructor(unittest.TestCase):
}
def ssync_response_callback(*args):
- return True, {ohash: ts}
+ return True, {ohash: {'ts_data': ts}}
ssync_calls = []
with mock_ssync_sender(ssync_calls,
@@ -2459,7 +2459,7 @@ class TestObjectReconstructor(unittest.TestCase):
}
def ssync_response_callback(*args):
- return True, {ohash: ts}
+ return True, {ohash: {'ts_data': ts}}
ssync_calls = []
with mock_ssync_sender(ssync_calls,
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index 1c7a768cf..9b20dae66 100755
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -44,7 +44,8 @@ from nose import SkipTest
from swift import __version__ as swift_version
from swift.common.http import is_success
-from test.unit import FakeLogger, debug_logger, mocked_http_conn
+from test.unit import FakeLogger, debug_logger, mocked_http_conn, \
+ make_timestamp_iter
from test.unit import connect_tcp, readuntil2crlfs, patch_policies
from swift.obj import server as object_server
from swift.obj import diskfile
@@ -339,6 +340,41 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(resp.status_int, 409)
self.assertEqual(resp.headers['X-Backend-Timestamp'], orig_timestamp)
+ def test_POST_conflicts_with_later_POST(self):
+ ts_iter = make_timestamp_iter()
+ t_put = ts_iter.next().internal
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'X-Timestamp': t_put,
+ 'Content-Length': 0,
+ 'Content-Type': 'plain/text'})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 201)
+
+ t_post1 = ts_iter.next().internal
+ t_post2 = ts_iter.next().internal
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'POST'},
+ headers={'X-Timestamp': t_post2})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 202)
+
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'POST'},
+ headers={'X-Timestamp': t_post1})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 409)
+
+ obj_dir = os.path.join(
+ self.testdir, 'sda1',
+ storage_directory(diskfile.get_data_dir(0), 'p',
+ hash_path('a', 'c', 'o')))
+
+ ts_file = os.path.join(obj_dir, t_post2 + '.meta')
+ self.assertTrue(os.path.isfile(ts_file))
+ meta_file = os.path.join(obj_dir, t_post1 + '.meta')
+ self.assertFalse(os.path.isfile(meta_file))
+
def test_POST_not_exist(self):
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/fail',
@@ -1082,6 +1118,44 @@ class TestObjectController(unittest.TestCase):
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
+ def test_PUT_succeeds_with_later_POST(self):
+ ts_iter = make_timestamp_iter()
+ t_put = ts_iter.next().internal
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'X-Timestamp': t_put,
+ 'Content-Length': 0,
+ 'Content-Type': 'plain/text'})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 201)
+
+ t_put2 = ts_iter.next().internal
+ t_post = ts_iter.next().internal
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'POST'},
+ headers={'X-Timestamp': t_post})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 202)
+
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'X-Timestamp': t_put2,
+ 'Content-Length': 0,
+ 'Content-Type': 'plain/text'},
+ )
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 201)
+
+ obj_dir = os.path.join(
+ self.testdir, 'sda1',
+ storage_directory(diskfile.get_data_dir(0), 'p',
+ hash_path('a', 'c', 'o')))
+
+ ts_file = os.path.join(obj_dir, t_put2 + '.data')
+ self.assertTrue(os.path.isfile(ts_file))
+ meta_file = os.path.join(obj_dir, t_post + '.meta')
+ self.assertTrue(os.path.isfile(meta_file))
+
def test_POST_system_metadata(self):
# check that diskfile sysmeta is not changed by a POST
timestamp1 = normalize_timestamp(time())
@@ -2394,6 +2468,42 @@ class TestObjectController(unittest.TestCase):
self.assertTrue(os.path.isfile(ts_1003_file))
self.assertEqual(len(os.listdir(os.path.dirname(ts_1003_file))), 1)
+ def test_DELETE_succeeds_with_later_POST(self):
+ ts_iter = make_timestamp_iter()
+ t_put = ts_iter.next().internal
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'X-Timestamp': t_put,
+ 'Content-Length': 0,
+ 'Content-Type': 'plain/text'})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 201)
+
+ t_delete = ts_iter.next().internal
+ t_post = ts_iter.next().internal
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'POST'},
+ headers={'X-Timestamp': t_post})
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 202)
+
+ req = Request.blank('/sda1/p/a/c/o',
+ environ={'REQUEST_METHOD': 'DELETE'},
+ headers={'X-Timestamp': t_delete},
+ )
+ resp = req.get_response(self.object_controller)
+ self.assertEqual(resp.status_int, 204)
+
+ obj_dir = os.path.join(
+ self.testdir, 'sda1',
+ storage_directory(diskfile.get_data_dir(0), 'p',
+ hash_path('a', 'c', 'o')))
+
+ ts_file = os.path.join(obj_dir, t_delete + '.ts')
+ self.assertTrue(os.path.isfile(ts_file))
+ meta_file = os.path.join(obj_dir, t_post + '.meta')
+ self.assertTrue(os.path.isfile(meta_file))
+
def test_DELETE_container_updates(self):
# Test swift.obj.server.ObjectController.DELETE and container
# updates, making sure container update is called in the correct
diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py
index 6185f097f..9a79c5a60 100644
--- a/test/unit/obj/test_ssync_receiver.py
+++ b/test/unit/obj/test_ssync_receiver.py
@@ -36,7 +36,7 @@ from swift.obj import ssync_receiver, ssync_sender
from swift.obj.reconstructor import ObjectReconstructor
from test import unit
-from test.unit import debug_logger, patch_policies
+from test.unit import debug_logger, patch_policies, make_timestamp_iter
@unit.patch_policies()
@@ -611,8 +611,8 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START',
- self.hash1,
- self.hash2,
+ self.hash1 + ' dm',
+ self.hash2 + ' dm',
':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
@@ -637,8 +637,8 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START',
- self.hash1,
- self.hash2,
+ self.hash1 + ' dm',
+ self.hash2 + ' dm',
':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
@@ -670,7 +670,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START',
- self.hash2,
+ self.hash2 + ' dm',
':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
@@ -706,7 +706,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START',
- self.hash2,
+ self.hash2 + ' dm',
':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
@@ -740,14 +740,14 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START',
- self.hash2,
+ self.hash2 + ' dm',
':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.error.called)
self.assertFalse(self.controller.logger.exception.called)
- def test_MISSING_CHECK_have_one_older(self):
+ def test_MISSING_CHECK_have_newer_meta(self):
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1',
diskfile.get_data_dir(POLICIES[0])),
@@ -760,22 +760,67 @@ class TestReceiver(unittest.TestCase):
fp.flush()
self.metadata1['Content-Length'] = '1'
diskfile.write_metadata(fp, self.metadata1)
+ # write newer .meta file
+ metadata = {'name': self.name1, 'X-Timestamp': self.ts2,
+ 'X-Object-Meta-Test': 'test'}
+ fp = open(os.path.join(object_dir, self.ts2 + '.meta'), 'w+')
+ diskfile.write_metadata(fp, metadata)
+
+ # receiver has .data at older_ts, .meta at ts2
+ # sender has .data at ts1
+ self.controller.logger = mock.MagicMock()
+ req = swob.Request.blank(
+ '/sda1/1',
+ environ={'REQUEST_METHOD': 'SSYNC'},
+ body=':MISSING_CHECK: START\r\n' +
+ self.hash1 + ' ' + self.ts1 + '\r\n'
+ ':MISSING_CHECK: END\r\n'
+ ':UPDATES: START\r\n:UPDATES: END\r\n')
+ resp = req.get_response(self.controller)
+ self.assertEqual(
+ self.body_lines(resp.body),
+ [':MISSING_CHECK: START',
+ self.hash1 + ' d',
+ ':MISSING_CHECK: END',
+ ':UPDATES: START', ':UPDATES: END'])
+ self.assertEqual(resp.status_int, 200)
+ self.assertFalse(self.controller.logger.error.called)
+ self.assertFalse(self.controller.logger.exception.called)
+ def test_MISSING_CHECK_have_older_meta(self):
+ object_dir = utils.storage_directory(
+ os.path.join(self.testdir, 'sda1',
+ diskfile.get_data_dir(POLICIES[0])),
+ '1', self.hash1)
+ utils.mkdirs(object_dir)
+ older_ts1 = utils.normalize_timestamp(float(self.ts1) - 1)
+ self.metadata1['X-Timestamp'] = older_ts1
+ fp = open(os.path.join(object_dir, older_ts1 + '.data'), 'w+')
+ fp.write('1')
+ fp.flush()
+ self.metadata1['Content-Length'] = '1'
+ diskfile.write_metadata(fp, self.metadata1)
+ # write .meta file at ts1
+ metadata = {'name': self.name1, 'X-Timestamp': self.ts1,
+ 'X-Object-Meta-Test': 'test'}
+ fp = open(os.path.join(object_dir, self.ts1 + '.meta'), 'w+')
+ diskfile.write_metadata(fp, metadata)
+
+ # receiver has .data at older_ts, .meta at ts1
+ # sender has .data at older_ts, .meta at ts2
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n' +
- self.hash1 + ' ' + self.ts1 + '\r\n' +
- self.hash2 + ' ' + self.ts2 + '\r\n'
+ self.hash1 + ' ' + older_ts1 + ' m:30d40\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[':MISSING_CHECK: START',
- self.hash1,
- self.hash2,
+ self.hash1 + ' m',
':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
@@ -1303,6 +1348,46 @@ class TestReceiver(unittest.TestCase):
actual = df.get_metadata()
self.assertEqual(expected, actual)
+ def test_UPDATES_POST(self):
+ _POST_request = [None]
+
+ @server.public
+ def _POST(request):
+ _POST_request[0] = request
+ return swob.HTTPAccepted()
+
+ with mock.patch.object(self.controller, 'POST', _POST):
+ self.controller.logger = mock.MagicMock()
+ req = swob.Request.blank(
+ '/device/partition',
+ environ={'REQUEST_METHOD': 'SSYNC'},
+ body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
+ ':UPDATES: START\r\n'
+ 'POST /a/c/o\r\n'
+ 'X-Timestamp: 1364456113.12344\r\n'
+ 'X-Object-Meta-Test1: one\r\n'
+ 'Specialty-Header: value\r\n\r\n')
+ resp = req.get_response(self.controller)
+ self.assertEqual(
+ self.body_lines(resp.body),
+ [':MISSING_CHECK: START', ':MISSING_CHECK: END',
+ ':UPDATES: START', ':UPDATES: END'])
+ self.assertEqual(resp.status_int, 200)
+ self.assertFalse(self.controller.logger.exception.called)
+ self.assertFalse(self.controller.logger.error.called)
+ req = _POST_request[0]
+ self.assertEqual(req.path, '/device/partition/a/c/o')
+ self.assertEqual(req.content_length, None)
+ self.assertEqual(req.headers, {
+ 'X-Timestamp': '1364456113.12344',
+ 'X-Object-Meta-Test1': 'one',
+ 'Specialty-Header': 'value',
+ 'Host': 'localhost:80',
+ 'X-Backend-Storage-Policy-Index': '0',
+ 'X-Backend-Replication': 'True',
+ 'X-Backend-Replication-Headers': (
+ 'x-timestamp x-object-meta-test1 specialty-header')})
+
def test_UPDATES_with_storage_policy(self):
# update router post policy patch
self.controller._diskfile_router = diskfile.DiskFileRouter(
@@ -1490,12 +1575,18 @@ class TestReceiver(unittest.TestCase):
return swob.HTTPCreated()
@server.public
+ def _POST(request):
+ _requests.append(request)
+ return swob.HTTPOk()
+
+ @server.public
def _DELETE(request):
_requests.append(request)
return swob.HTTPNoContent()
with contextlib.nested(
mock.patch.object(self.controller, 'PUT', _PUT),
+ mock.patch.object(self.controller, 'POST', _POST),
mock.patch.object(self.controller, 'DELETE', _DELETE)):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
@@ -1529,7 +1620,17 @@ class TestReceiver(unittest.TestCase):
'\r\n'
'DELETE /a/c/o6\r\n'
'X-Timestamp: 1364456113.00006\r\n'
- '\r\n')
+ '\r\n'
+ 'PUT /a/c/o7\r\n'
+ 'Content-Length: 7\r\n'
+ 'X-Timestamp: 1364456113.00007\r\n'
+ '\r\n'
+ '1234567'
+ 'POST /a/c/o7\r\n'
+ 'X-Object-Meta-Test-User: user_meta\r\n'
+ 'X-Timestamp: 1364456113.00008\r\n'
+ '\r\n'
+ )
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
@@ -1538,7 +1639,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertFalse(self.controller.logger.exception.called)
self.assertFalse(self.controller.logger.error.called)
- self.assertEqual(len(_requests), 6) # sanity
+ self.assertEqual(len(_requests), 8) # sanity
req = _requests.pop(0)
self.assertEqual(req.method, 'PUT')
self.assertEqual(req.path, '/device/partition/a/c/o1')
@@ -1609,6 +1710,31 @@ class TestReceiver(unittest.TestCase):
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'})
+ req = _requests.pop(0)
+ self.assertEqual(req.method, 'PUT')
+ self.assertEqual(req.path, '/device/partition/a/c/o7')
+ self.assertEqual(req.content_length, 7)
+ self.assertEqual(req.headers, {
+ 'Content-Length': '7',
+ 'X-Timestamp': '1364456113.00007',
+ 'Host': 'localhost:80',
+ 'X-Backend-Storage-Policy-Index': '0',
+ 'X-Backend-Replication': 'True',
+ 'X-Backend-Replication-Headers': (
+ 'content-length x-timestamp')})
+ self.assertEqual(req.read_body, '1234567')
+ req = _requests.pop(0)
+ self.assertEqual(req.method, 'POST')
+ self.assertEqual(req.path, '/device/partition/a/c/o7')
+ self.assertEqual(req.content_length, None)
+ self.assertEqual(req.headers, {
+ 'X-Timestamp': '1364456113.00008',
+ 'X-Object-Meta-Test-User': 'user_meta',
+ 'Host': 'localhost:80',
+ 'X-Backend-Storage-Policy-Index': '0',
+ 'X-Backend-Replication': 'True',
+ 'X-Backend-Replication-Headers': (
+ 'x-object-meta-test-user x-timestamp')})
self.assertEqual(_requests, [])
def test_UPDATES_subreq_does_not_read_all(self):
@@ -1916,5 +2042,125 @@ class TestSsyncRxServer(unittest.TestCase):
self.assertFalse(mock_missing_check.called)
+class TestModuleMethods(unittest.TestCase):
+ def test_decode_missing(self):
+ object_hash = '9d41d8cd98f00b204e9800998ecf0abc'
+ ts_iter = make_timestamp_iter()
+ t_data = ts_iter.next()
+ t_meta = ts_iter.next()
+ d_meta_data = t_meta.raw - t_data.raw
+
+ # legacy single timestamp string
+ msg = '%s %s' % (object_hash, t_data.internal)
+ expected = dict(object_hash=object_hash,
+ ts_meta=t_data,
+ ts_data=t_data)
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg))
+
+ # hex meta delta encoded as extra message part
+ msg = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
+ expected = dict(object_hash=object_hash,
+ ts_data=t_data,
+ ts_meta=t_meta)
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg))
+
+ # unexpected zero delta is tolerated
+ msg = '%s %s m:0' % (object_hash, t_data.internal)
+ expected = dict(object_hash=object_hash,
+ ts_meta=t_data,
+ ts_data=t_data)
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg))
+
+ # unexpected subparts in timestamp delta part are tolerated
+ msg = '%s %s c:12345,m:%x,junk' % (object_hash,
+ t_data.internal,
+ d_meta_data)
+ expected = dict(object_hash=object_hash,
+ ts_meta=t_meta,
+ ts_data=t_data)
+ self.assertEqual(
+ expected, ssync_receiver.decode_missing(msg))
+
+ # extra message parts tolerated
+ msg = '%s %s m:%x future parts' % (object_hash,
+ t_data.internal,
+ d_meta_data)
+ expected = dict(object_hash=object_hash,
+ ts_meta=t_meta,
+ ts_data=t_data)
+ self.assertEqual(expected, ssync_receiver.decode_missing(msg))
+
+ def test_encode_wanted(self):
+ ts_iter = make_timestamp_iter()
+ old_t_data = ts_iter.next()
+ t_data = ts_iter.next()
+ old_t_meta = ts_iter.next()
+ t_meta = ts_iter.next()
+
+ remote = {
+ 'object_hash': 'theremotehash',
+ 'ts_data': t_data,
+ 'ts_meta': t_meta,
+ }
+
+ # missing
+ local = {}
+ expected = 'theremotehash dm'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # in-sync
+ local = {
+ 'ts_data': t_data,
+ 'ts_meta': t_meta,
+ }
+ expected = None
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # out-of-sync
+ local = {
+ 'ts_data': old_t_data,
+ 'ts_meta': old_t_meta,
+ }
+ expected = 'theremotehash dm'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # old data
+ local = {
+ 'ts_data': old_t_data,
+ 'ts_meta': t_meta,
+ }
+ expected = 'theremotehash d'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # old metadata
+ local = {
+ 'ts_data': t_data,
+ 'ts_meta': old_t_meta,
+ }
+ expected = 'theremotehash m'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # in-sync tombstone
+ local = {
+ 'ts_data': t_data,
+ }
+ expected = None
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+ # old tombstone
+ local = {
+ 'ts_data': old_t_data,
+ }
+ expected = 'theremotehash d'
+ self.assertEqual(ssync_receiver.encode_wanted(remote, local),
+ expected)
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py
index 64cc75613..e9edafd4f 100644
--- a/test/unit/obj/test_ssync_sender.py
+++ b/test/unit/obj/test_ssync_sender.py
@@ -12,6 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from collections import defaultdict
import hashlib
import os
@@ -19,6 +20,7 @@ import shutil
import tempfile
import time
import unittest
+import urllib
import eventlet
import itertools
@@ -30,10 +32,10 @@ from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
DiskFileDeleted
from swift.common.utils import Timestamp
-from swift.obj import ssync_sender, diskfile, server
+from swift.obj import ssync_sender, diskfile, server, ssync_receiver
from swift.obj.reconstructor import RebuildingECDiskFileStream
-from test.unit import debug_logger, patch_policies
+from test.unit import debug_logger, patch_policies, make_timestamp_iter
class FakeReplicator(object):
@@ -461,7 +463,7 @@ class TestSender(BaseTestSender):
self.daemon, node, job, ['ignored'],
remote_check_objs=None)
patch_sender(sender)
- sender.send_list = [wanted]
+ sender.send_map = {wanted: []}
sender.available_map = available_map
success, candidates = sender()
self.assertTrue(success)
@@ -475,7 +477,7 @@ class TestSender(BaseTestSender):
self.daemon, node, job, ['ignored'],
remote_check_objs=remote_check_objs)
patch_sender(sender)
- sender.send_list = [wanted]
+ sender.send_map = {wanted: []}
sender.available_map = available_map
success, candidates = sender()
self.assertTrue(success)
@@ -485,7 +487,7 @@ class TestSender(BaseTestSender):
'1380144474.44444')])
self.assertEqual(expected_map, candidates)
- def test_call_and_missing_check(self):
+ def test_call_and_missing_check_metadata_legacy_response(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy == POLICIES.legacy:
@@ -493,12 +495,14 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')
+ {'ts_data': Timestamp(1380144470.00000),
+ 'ts_meta': Timestamp(1380155570.00005)})
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
self.sender.connection = FakeConnection()
+ self.sender.node = {}
self.sender.job = {
'device': 'dev',
'partition': '9',
@@ -510,6 +514,52 @@ class TestSender(BaseTestSender):
chunk_body=(
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc\r\n'
+ ':MISSING_CHECK: END\r\n'
+ ':UPDATES: START\r\n'
+ ':UPDATES: END\r\n'
+ ))
+ self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
+ self.sender.connect = mock.MagicMock()
+ self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock()
+ self.sender.disconnect = mock.MagicMock()
+ success, candidates = self.sender()
+ self.assertTrue(success)
+ found_post = found_put = False
+ for chunk in self.sender.connection.sent:
+ if 'POST' in chunk:
+ found_post = True
+ if 'PUT' in chunk:
+ found_put = True
+ self.assertFalse(found_post)
+ self.assertTrue(found_put)
+ self.assertEqual(self.sender.failures, 0)
+
+ def test_call_and_missing_check(self):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
+ if device == 'dev' and partition == '9' and suffixes == ['abc'] \
+ and policy == POLICIES.legacy:
+ yield (
+ '/srv/node/dev/objects/9/abc/'
+ '9d41d8cd98f00b204e9800998ecf0abc',
+ '9d41d8cd98f00b204e9800998ecf0abc',
+ {'ts_data': Timestamp(1380144470.00000)})
+ else:
+ raise Exception(
+ 'No match for %r %r %r' % (device, partition, suffixes))
+
+ self.sender.connection = FakeConnection()
+ self.sender.node = {}
+ self.sender.job = {
+ 'device': 'dev',
+ 'partition': '9',
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
+ self.sender.suffixes = ['abc']
+ self.sender.response = FakeResponse(
+ chunk_body=(
+ ':MISSING_CHECK: START\r\n'
+ '9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
@@ -517,8 +567,9 @@ class TestSender(BaseTestSender):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')]))
+ self.assertEqual(candidates,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ {'ts_data': Timestamp(1380144470.00000)})]))
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list(self):
@@ -529,7 +580,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')
+ {'ts_data': Timestamp(1380144470.00000)})
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
@@ -552,8 +603,9 @@ class TestSender(BaseTestSender):
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
self.assertTrue(success)
- self.assertEqual(candidates, dict([('9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')]))
+ self.assertEqual(candidates,
+ dict([('9d41d8cd98f00b204e9800998ecf0abc',
+ {'ts_data': Timestamp(1380144470.00000)})]))
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list_but_required(self):
@@ -564,7 +616,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')
+ {'ts_data': Timestamp(1380144470.00000)})
else:
raise Exception(
'No match for %r %r %r' % (device, partition, suffixes))
@@ -574,13 +626,13 @@ class TestSender(BaseTestSender):
'policy': POLICIES.legacy,
'frag_index': 0,
}
- self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'],
+ self.sender = ssync_sender.Sender(self.daemon, {}, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
self.sender.connection = FakeConnection()
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
- '9d41d8cd98f00b204e9800998ecf0abc\r\n'
+ '9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock()
@@ -753,7 +805,7 @@ class TestSender(BaseTestSender):
''.join(self.sender.connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
- self.assertEqual(self.sender.send_list, [])
+ self.assertEqual(self.sender.send_map, {})
self.assertEqual(self.sender.available_map, {})
def test_missing_check_has_suffixes(self):
@@ -765,17 +817,18 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')
+ {'ts_data': Timestamp(1380144470.00000)})
yield (
'/srv/node/dev/objects/9/def/'
'9d41d8cd98f00b204e9800998ecf0def',
'9d41d8cd98f00b204e9800998ecf0def',
- '1380144472.22222')
+ {'ts_data': Timestamp(1380144472.22222)})
yield (
'/srv/node/dev/objects/9/def/'
'9d41d8cd98f00b204e9800998ecf1def',
'9d41d8cd98f00b204e9800998ecf1def',
- '1380144474.44444')
+ {'ts_data': Timestamp(1380144474.44444),
+ 'ts_meta': Timestamp(1380144475.44444)})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -799,12 +852,17 @@ class TestSender(BaseTestSender):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222\r\n\r\n'
- '33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n'
+ '3b\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444 '
+ 'm:186a0\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
- self.assertEqual(self.sender.send_list, [])
- candidates = [('9d41d8cd98f00b204e9800998ecf0abc', '1380144470.00000'),
- ('9d41d8cd98f00b204e9800998ecf0def', '1380144472.22222'),
- ('9d41d8cd98f00b204e9800998ecf1def', '1380144474.44444')]
+ self.assertEqual(self.sender.send_map, {})
+ candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
+ dict(ts_data=Timestamp(1380144470.00000))),
+ ('9d41d8cd98f00b204e9800998ecf0def',
+ dict(ts_data=Timestamp(1380144472.22222))),
+ ('9d41d8cd98f00b204e9800998ecf1def',
+ dict(ts_data=Timestamp(1380144474.44444),
+ ts_meta=Timestamp(1380144475.44444)))]
self.assertEqual(self.sender.available_map, dict(candidates))
def test_missing_check_far_end_disconnect(self):
@@ -816,7 +874,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')
+ {'ts_data': Timestamp(1380144470.00000)})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -844,7 +902,7 @@ class TestSender(BaseTestSender):
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')]))
+ dict(ts_data=Timestamp(1380144470.00000)))]))
def test_missing_check_far_end_disconnect2(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -855,7 +913,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')
+ {'ts_data': Timestamp(1380144470.00000)})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -884,7 +942,7 @@ class TestSender(BaseTestSender):
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')]))
+ {'ts_data': Timestamp(1380144470.00000)})]))
def test_missing_check_far_end_unexpected(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@@ -895,7 +953,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')
+ {'ts_data': Timestamp(1380144470.00000)})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -923,9 +981,9 @@ class TestSender(BaseTestSender):
'15\r\n:MISSING_CHECK: END\r\n\r\n')
self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')]))
+ {'ts_data': Timestamp(1380144470.00000)})]))
- def test_missing_check_send_list(self):
+ def test_missing_check_send_map(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == POLICIES.legacy and
@@ -934,7 +992,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')
+ {'ts_data': Timestamp(1380144470.00000)})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -950,7 +1008,7 @@ class TestSender(BaseTestSender):
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
- '0123abc\r\n'
+ '0123abc dm\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
@@ -959,10 +1017,11 @@ class TestSender(BaseTestSender):
'17\r\n:MISSING_CHECK: START\r\n\r\n'
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
'15\r\n:MISSING_CHECK: END\r\n\r\n')
- self.assertEqual(self.sender.send_list, ['0123abc'])
+ self.assertEqual(
+ self.sender.send_map, {'0123abc': {'data': True, 'meta': True}})
self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')]))
+ {'ts_data': Timestamp(1380144470.00000)})]))
def test_missing_check_extra_line_parts(self):
# check that sender tolerates extra parts in missing check
@@ -975,7 +1034,7 @@ class TestSender(BaseTestSender):
'/srv/node/dev/objects/9/abc/'
'9d41d8cd98f00b204e9800998ecf0abc',
'9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')
+ {'ts_data': Timestamp(1380144470.00000)})
else:
raise Exception(
'No match for %r %r %r %r' % (device, partition,
@@ -991,14 +1050,15 @@ class TestSender(BaseTestSender):
self.sender.response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
- '0123abc extra response parts\r\n'
+ '0123abc d extra response parts\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
self.sender.missing_check()
- self.assertEqual(self.sender.send_list, ['0123abc'])
+ self.assertEqual(self.sender.send_map,
+ {'0123abc': {'data': True}})
self.assertEqual(self.sender.available_map,
dict([('9d41d8cd98f00b204e9800998ecf0abc',
- '1380144470.00000')]))
+ {'ts_data': Timestamp(1380144470.00000)})]))
def test_updates_timeout(self):
self.sender.connection = FakeConnection()
@@ -1006,9 +1066,9 @@ class TestSender(BaseTestSender):
self.sender.daemon.node_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates)
- def test_updates_empty_send_list(self):
+ def test_updates_empty_send_map(self):
self.sender.connection = FakeConnection()
- self.sender.send_list = []
+ self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@@ -1021,7 +1081,7 @@ class TestSender(BaseTestSender):
def test_updates_unexpected_response_lines1(self):
self.sender.connection = FakeConnection()
- self.sender.send_list = []
+ self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
'abc\r\n'
@@ -1040,7 +1100,7 @@ class TestSender(BaseTestSender):
def test_updates_unexpected_response_lines2(self):
self.sender.connection = FakeConnection()
- self.sender.send_list = []
+ self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@@ -1073,7 +1133,7 @@ class TestSender(BaseTestSender):
'frag_index': 0,
}
self.sender.node = {}
- self.sender.send_list = [object_hash]
+ self.sender.send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.response = FakeResponse(
@@ -1107,7 +1167,7 @@ class TestSender(BaseTestSender):
'frag_index': 0,
}
self.sender.node = {}
- self.sender.send_list = [object_hash]
+ self.sender.send_map = {object_hash: {'data': True}}
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@@ -1124,11 +1184,19 @@ class TestSender(BaseTestSender):
)
def test_updates_put(self):
+ # sender has data file and meta file
+ ts_iter = make_timestamp_iter()
device = 'dev'
part = '9'
object_parts = ('a', 'c', 'o')
- df = self._make_open_diskfile(device, part, *object_parts)
+ t1 = ts_iter.next()
+ df = self._make_open_diskfile(
+ device, part, *object_parts, timestamp=t1)
+ t2 = ts_iter.next()
+ metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
+ df.write_metadata(metadata)
object_hash = utils.hash_path(*object_parts)
+ df.open()
expected = df.get_metadata()
self.sender.connection = FakeConnection()
self.sender.job = {
@@ -1138,15 +1206,18 @@ class TestSender(BaseTestSender):
'frag_index': 0,
}
self.sender.node = {}
- self.sender.send_list = [object_hash]
+ # receiver requested data only
+ self.sender.send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
+ self.sender.send_post = mock.MagicMock()
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates()
self.assertEqual(self.sender.send_delete.mock_calls, [])
+ self.assertEqual(self.sender.send_post.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls))
args, _kwargs = self.sender.send_put.call_args
path, df = args
@@ -1160,6 +1231,105 @@ class TestSender(BaseTestSender):
'11\r\n:UPDATES: START\r\n\r\n'
'f\r\n:UPDATES: END\r\n\r\n')
+ def test_updates_post(self):
+ ts_iter = make_timestamp_iter()
+ device = 'dev'
+ part = '9'
+ object_parts = ('a', 'c', 'o')
+ t1 = ts_iter.next()
+ df = self._make_open_diskfile(
+ device, part, *object_parts, timestamp=t1)
+ t2 = ts_iter.next()
+ metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
+ df.write_metadata(metadata)
+ object_hash = utils.hash_path(*object_parts)
+ df.open()
+ expected = df.get_metadata()
+ self.sender.connection = FakeConnection()
+ self.sender.job = {
+ 'device': device,
+ 'partition': part,
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
+ self.sender.node = {}
+ # receiver requested only meta
+ self.sender.send_map = {object_hash: {'meta': True}}
+ self.sender.send_delete = mock.MagicMock()
+ self.sender.send_put = mock.MagicMock()
+ self.sender.send_post = mock.MagicMock()
+ self.sender.response = FakeResponse(
+ chunk_body=(
+ ':UPDATES: START\r\n'
+ ':UPDATES: END\r\n'))
+ self.sender.updates()
+ self.assertEqual(self.sender.send_delete.mock_calls, [])
+ self.assertEqual(self.sender.send_put.mock_calls, [])
+ self.assertEqual(1, len(self.sender.send_post.mock_calls))
+ args, _kwargs = self.sender.send_post.call_args
+ path, df = args
+ self.assertEqual(path, '/a/c/o')
+ self.assertIsInstance(df, diskfile.DiskFile)
+ self.assertEqual(expected, df.get_metadata())
+ # note that the post line isn't actually sent since we mock send_post;
+ # send_post is tested separately.
+ self.assertEqual(
+ ''.join(self.sender.connection.sent),
+ '11\r\n:UPDATES: START\r\n\r\n'
+ 'f\r\n:UPDATES: END\r\n\r\n')
+
+ def test_updates_put_and_post(self):
+ ts_iter = make_timestamp_iter()
+ device = 'dev'
+ part = '9'
+ object_parts = ('a', 'c', 'o')
+ t1 = ts_iter.next()
+ df = self._make_open_diskfile(
+ device, part, *object_parts, timestamp=t1)
+ t2 = ts_iter.next()
+ metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
+ df.write_metadata(metadata)
+ object_hash = utils.hash_path(*object_parts)
+ df.open()
+ expected = df.get_metadata()
+ self.sender.connection = FakeConnection()
+ self.sender.job = {
+ 'device': device,
+ 'partition': part,
+ 'policy': POLICIES.legacy,
+ 'frag_index': 0,
+ }
+ self.sender.node = {}
+ # receiver requested data and meta
+ self.sender.send_map = {object_hash: {'meta': True, 'data': True}}
+ self.sender.send_delete = mock.MagicMock()
+ self.sender.send_put = mock.MagicMock()
+ self.sender.send_post = mock.MagicMock()
+ self.sender.response = FakeResponse(
+ chunk_body=(
+ ':UPDATES: START\r\n'
+ ':UPDATES: END\r\n'))
+ self.sender.updates()
+ self.assertEqual(self.sender.send_delete.mock_calls, [])
+ self.assertEqual(1, len(self.sender.send_put.mock_calls))
+ self.assertEqual(1, len(self.sender.send_post.mock_calls))
+
+ args, _kwargs = self.sender.send_put.call_args
+ path, df = args
+ self.assertEqual(path, '/a/c/o')
+ self.assertIsInstance(df, diskfile.DiskFile)
+ self.assertEqual(expected, df.get_metadata())
+
+ args, _kwargs = self.sender.send_post.call_args
+ path, df = args
+ self.assertEqual(path, '/a/c/o')
+ self.assertIsInstance(df, diskfile.DiskFile)
+ self.assertEqual(expected, df.get_metadata())
+ self.assertEqual(
+ ''.join(self.sender.connection.sent),
+ '11\r\n:UPDATES: START\r\n\r\n'
+ 'f\r\n:UPDATES: END\r\n\r\n')
+
def test_updates_storage_policy_index(self):
device = 'dev'
part = '9'
@@ -1175,7 +1345,7 @@ class TestSender(BaseTestSender):
'policy': POLICIES[0],
'frag_index': 0}
self.sender.node = {}
- self.sender.send_list = [object_hash]
+ self.sender.send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.response = FakeResponse(
@@ -1194,7 +1364,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_timeout_start(self):
self.sender.connection = FakeConnection()
- self.sender.send_list = []
+ self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@@ -1211,7 +1381,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_disconnect_start(self):
self.sender.connection = FakeConnection()
- self.sender.send_list = []
+ self.sender.send_map = {}
self.sender.response = FakeResponse(chunk_body='\r\n')
exc = None
try:
@@ -1226,7 +1396,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_unexp_start(self):
self.sender.connection = FakeConnection()
- self.sender.send_list = []
+ self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
'anything else\r\n'
@@ -1245,7 +1415,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_timeout_end(self):
self.sender.connection = FakeConnection()
- self.sender.send_list = []
+ self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@@ -1264,7 +1434,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_disconnect_end(self):
self.sender.connection = FakeConnection()
- self.sender.send_list = []
+ self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@@ -1282,7 +1452,7 @@ class TestSender(BaseTestSender):
def test_updates_read_response_unexp_end(self):
self.sender.connection = FakeConnection()
- self.sender.send_list = []
+ self.sender.send_map = {}
self.sender.response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
@@ -1358,13 +1528,20 @@ class TestSender(BaseTestSender):
self.assertEqual(str(exc), '0.01 seconds: send_put chunk')
def test_send_put(self):
+ ts_iter = make_timestamp_iter()
+ t1 = ts_iter.next()
body = 'test'
extra_metadata = {'Some-Other-Header': 'value'}
- df = self._make_open_diskfile(body=body,
+ df = self._make_open_diskfile(body=body, timestamp=t1,
extra_metadata=extra_metadata)
expected = dict(df.get_metadata())
expected['body'] = body
expected['chunk_size'] = len(body)
+ # .meta file metadata is not included in expected for data only PUT
+ t2 = ts_iter.next()
+ metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
+ df.write_metadata(metadata)
+ df.open()
self.sender.connection = FakeConnection()
self.sender.send_put('/a/c/o', df)
self.assertEqual(
@@ -1380,6 +1557,32 @@ class TestSender(BaseTestSender):
'%(chunk_size)s\r\n'
'%(body)s\r\n' % expected)
+ def test_send_post(self):
+ # create .data file
+ extra_metadata = {'X-Object-Meta-Foo': 'old_value',
+ 'X-Object-Sysmeta-Test': 'test_sysmeta',
+ 'Content-Type': 'test_content_type'}
+ ts_0 = next(make_timestamp_iter())
+ df = self._make_open_diskfile(extra_metadata=extra_metadata,
+ timestamp=ts_0)
+ # create .meta file
+ ts_1 = next(make_timestamp_iter())
+ newer_metadata = {'X-Object-Meta-Foo': 'new_value',
+ 'X-Timestamp': ts_1.internal}
+ df.write_metadata(newer_metadata)
+
+ self.sender.connection = FakeConnection()
+ with df.open():
+ self.sender.send_post('/a/c/o', df)
+ self.assertEqual(
+ ''.join(self.sender.connection.sent),
+ '4c\r\n'
+ 'POST /a/c/o\r\n'
+ 'X-Object-Meta-Foo: new_value\r\n'
+ 'X-Timestamp: %s\r\n'
+ '\r\n'
+ '\r\n' % ts_1.internal)
+
def test_disconnect_timeout(self):
self.sender.connection = FakeConnection()
self.sender.connection.send = lambda d: eventlet.sleep(1)
@@ -1556,7 +1759,7 @@ class TestBaseSsync(BaseTestSender):
def tx_updates(results, line):
self.assertEqual('tx', line[0])
subrequests = results['tx_updates']
- if line[1].startswith(('PUT', 'DELETE')):
+ if line[1].startswith(('PUT', 'DELETE', 'POST')):
parts = line[1].split('\r\n')
method, path = parts[0].split()
subreq = {'method': method, 'path': path, 'req': line[1],
@@ -1626,6 +1829,8 @@ class TestBaseSsync(BaseTestSender):
"""
for o_name, diskfiles in tx_objs.items():
for tx_df in diskfiles:
+ # check tx file still intact - ssync does not do any cleanup!
+ tx_df.open()
if tx_frag_index is None or tx_df._frag_index == tx_frag_index:
# this diskfile should have been sync'd,
# check rx file is ok
@@ -1641,24 +1846,21 @@ class TestBaseSsync(BaseTestSender):
self.assertRaises(DiskFileNotExist, self._open_rx_diskfile,
o_name, policy,
frag_index=tx_df._frag_index)
- # check tx file still intact - ssync does not do any cleanup!
- tx_df.open()
def _verify_tombstones(self, tx_objs, policy):
# verify tx and rx tombstones that should be in sync
for o_name, diskfiles in tx_objs.items():
- for tx_df_ in diskfiles:
- try:
- self._open_tx_diskfile(o_name, policy)
- self.fail('DiskFileDeleted expected')
- except DiskFileDeleted as exc:
- tx_delete_time = exc.timestamp
- try:
- self._open_rx_diskfile(o_name, policy)
- self.fail('DiskFileDeleted expected')
- except DiskFileDeleted as exc:
- rx_delete_time = exc.timestamp
- self.assertEqual(tx_delete_time, rx_delete_time)
+ try:
+ self._open_tx_diskfile(o_name, policy)
+ self.fail('DiskFileDeleted expected')
+ except DiskFileDeleted as exc:
+ tx_delete_time = exc.timestamp
+ try:
+ self._open_rx_diskfile(o_name, policy)
+ self.fail('DiskFileDeleted expected')
+ except DiskFileDeleted as exc:
+ rx_delete_time = exc.timestamp
+ self.assertEqual(tx_delete_time, rx_delete_time)
@patch_policies(with_ec_default=True)
@@ -1879,7 +2081,7 @@ class TestSsyncReplication(TestBaseSsync):
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
# o3 is on tx and older copy on rx
t3a = next(self.ts_iter)
- rx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
+ rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3a)
t3b = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b)
# o4 in sync on rx and tx
@@ -1925,7 +2127,7 @@ class TestSsyncReplication(TestBaseSsync):
# run the sync protocol...
success, in_sync_objs = sender()
- self.assertEqual(7, len(in_sync_objs))
+ self.assertEqual(7, len(in_sync_objs), trace['messages'])
self.assertTrue(success)
# verify protocol
@@ -1983,6 +2185,287 @@ class TestSsyncReplication(TestBaseSsync):
# TOTAL = 80
self.assertEqual(80, trace.get('readline_bytes'))
+ def test_meta_file_sync(self):
+ policy = POLICIES.default
+ rx_node_index = 0
+
+ # create diskfiles...
+ tx_objs = {}
+ rx_objs = {}
+ tx_tombstones = {}
+ rx_tombstones = {}
+ tx_df_mgr = self.daemon._diskfile_router[policy]
+ rx_df_mgr = self.rx_controller._diskfile_router[policy]
+
+ expected_subreqs = defaultdict(list)
+
+ # o1 on tx only with meta file
+ t1 = self.ts_iter.next()
+ tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
+ t1_meta = self.ts_iter.next()
+ metadata = {'X-Timestamp': t1_meta.internal,
+ 'X-Object-Meta-Test': 'o1',
+ 'X-Object-Sysmeta-Test': 'sys_o1'}
+ tx_objs['o1'][0].write_metadata(metadata)
+ expected_subreqs['PUT'].append('o1')
+ expected_subreqs['POST'].append('o1')
+
+ # o2 on tx with meta, on rx without meta
+ t2 = self.ts_iter.next()
+ tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
+ t2_meta = self.ts_iter.next()
+ metadata = {'X-Timestamp': t2_meta.internal,
+ 'X-Object-Meta-Test': 'o2',
+ 'X-Object-Sysmeta-Test': 'sys_o2'}
+ tx_objs['o2'][0].write_metadata(metadata)
+ rx_objs['o2'] = self._create_ondisk_files(rx_df_mgr, 'o2', policy, t2)
+ expected_subreqs['POST'].append('o2')
+
+ # o3 is on tx with meta, rx has newer data but no meta
+ t3a = self.ts_iter.next()
+ tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
+ t3b = self.ts_iter.next()
+ rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3b)
+ t3_meta = self.ts_iter.next()
+ metadata = {'X-Timestamp': t3_meta.internal,
+ 'X-Object-Meta-Test': 'o3',
+ 'X-Object-Sysmeta-Test': 'sys_o3'}
+ tx_objs['o3'][0].write_metadata(metadata)
+ expected_subreqs['POST'].append('o3')
+
+ # o4 is on tx with meta, rx has older data and up to date meta
+ t4a = self.ts_iter.next()
+ rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4a)
+ t4b = self.ts_iter.next()
+ tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4b)
+ t4_meta = self.ts_iter.next()
+ metadata = {'X-Timestamp': t4_meta.internal,
+ 'X-Object-Meta-Test': 'o4',
+ 'X-Object-Sysmeta-Test': 'sys_o4'}
+ tx_objs['o4'][0].write_metadata(metadata)
+ rx_objs['o4'][0].write_metadata(metadata)
+ expected_subreqs['PUT'].append('o4')
+
+ # o5 is on tx with meta, rx is in sync with data and meta
+ t5 = self.ts_iter.next()
+ rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5)
+ tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5)
+ t5_meta = self.ts_iter.next()
+ metadata = {'X-Timestamp': t5_meta.internal,
+ 'X-Object-Meta-Test': 'o5',
+ 'X-Object-Sysmeta-Test': 'sys_o5'}
+ tx_objs['o5'][0].write_metadata(metadata)
+ rx_objs['o5'][0].write_metadata(metadata)
+
+ # o6 is tombstone on tx, rx has older data and meta
+ t6 = self.ts_iter.next()
+ tx_tombstones['o6'] = self._create_ondisk_files(
+ tx_df_mgr, 'o6', policy, t6)
+ rx_tombstones['o6'] = self._create_ondisk_files(
+ rx_df_mgr, 'o6', policy, t6)
+ metadata = {'X-Timestamp': self.ts_iter.next().internal,
+ 'X-Object-Meta-Test': 'o6',
+ 'X-Object-Sysmeta-Test': 'sys_o6'}
+ rx_tombstones['o6'][0].write_metadata(metadata)
+ tx_tombstones['o6'][0].delete(self.ts_iter.next())
+ expected_subreqs['DELETE'].append('o6')
+
+ # o7 is tombstone on rx, tx has older data and meta,
+ # no subreqs expected...
+ t7 = self.ts_iter.next()
+ tx_objs['o7'] = self._create_ondisk_files(tx_df_mgr, 'o7', policy, t7)
+ rx_tombstones['o7'] = self._create_ondisk_files(
+ rx_df_mgr, 'o7', policy, t7)
+ metadata = {'X-Timestamp': self.ts_iter.next().internal,
+ 'X-Object-Meta-Test': 'o7',
+ 'X-Object-Sysmeta-Test': 'sys_o7'}
+ tx_objs['o7'][0].write_metadata(metadata)
+ rx_tombstones['o7'][0].delete(self.ts_iter.next())
+
+ suffixes = set()
+ for diskfiles in (tx_objs.values() + tx_tombstones.values()):
+ for df in diskfiles:
+ suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
+
+ # create ssync sender instance...
+ job = {'device': self.device,
+ 'partition': self.partition,
+ 'policy': policy}
+ node = dict(self.rx_node)
+ node.update({'index': rx_node_index})
+ sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
+ # wrap connection from tx to rx to capture ssync messages...
+ sender.connect, trace = self.make_connect_wrapper(sender)
+
+ # run the sync protocol...
+ success, in_sync_objs = sender()
+
+ self.assertEqual(7, len(in_sync_objs))
+ self.assertTrue(success)
+
+ # verify protocol
+ results = self._analyze_trace(trace)
+ self.assertEqual(7, len(results['tx_missing']))
+ self.assertEqual(5, len(results['rx_missing']))
+ for subreq in results.get('tx_updates'):
+ obj = subreq['path'].split('/')[3]
+ method = subreq['method']
+ self.assertTrue(obj in expected_subreqs[method],
+ 'Unexpected %s subreq for object %s, expected %s'
+ % (method, obj, expected_subreqs[method]))
+ expected_subreqs[method].remove(obj)
+ if method == 'PUT':
+ expected_body = '%s___None' % subreq['path']
+ self.assertEqual(expected_body, subreq['body'])
+ # verify all expected subreqs consumed
+ for _method, expected in expected_subreqs.items():
+ self.assertFalse(expected)
+ self.assertFalse(results['rx_updates'])
+
+ # verify on disk files...
+ del tx_objs['o7'] # o7 not expected to be sync'd
+ self._verify_ondisk_files(tx_objs, policy)
+ self._verify_tombstones(tx_tombstones, policy)
+ for oname, rx_obj in rx_objs.items():
+ df = rx_obj[0].open()
+ metadata = df.get_metadata()
+ self.assertEqual(metadata['X-Object-Meta-Test'], oname)
+ self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
+
+ def test_meta_file_not_synced_to_legacy_receiver(self):
+ # verify that the sender does sync a data file to a legacy receiver,
+ # but does not PUT meta file content to a legacy receiver
+ policy = POLICIES.default
+ rx_node_index = 0
+
+ # create diskfiles...
+ tx_df_mgr = self.daemon._diskfile_router[policy]
+ rx_df_mgr = self.rx_controller._diskfile_router[policy]
+
+ # rx has data at t1 but no meta
+ # object is on tx with data at t2, meta at t3,
+ t1 = self.ts_iter.next()
+ self._create_ondisk_files(rx_df_mgr, 'o1', policy, t1)
+ t2 = self.ts_iter.next()
+ tx_obj = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t2)[0]
+ t3 = self.ts_iter.next()
+ metadata = {'X-Timestamp': t3.internal,
+ 'X-Object-Meta-Test': 'o3',
+ 'X-Object-Sysmeta-Test': 'sys_o3'}
+ tx_obj.write_metadata(metadata)
+
+ suffixes = [os.path.basename(os.path.dirname(tx_obj._datadir))]
+ # create ssync sender instance...
+ job = {'device': self.device,
+ 'partition': self.partition,
+ 'policy': policy}
+ node = dict(self.rx_node)
+ node.update({'index': rx_node_index})
+ sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
+ # wrap connection from tx to rx to capture ssync messages...
+ sender.connect, trace = self.make_connect_wrapper(sender)
+
+ def _legacy_check_missing(self, line):
+ # reproduces behavior of 'legacy' ssync receiver missing_checks()
+ parts = line.split()
+ object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]]
+ want = False
+ try:
+ df = self.diskfile_mgr.get_diskfile_from_hash(
+ self.device, self.partition, object_hash, self.policy,
+ frag_index=self.frag_index)
+ except exceptions.DiskFileNotExist:
+ want = True
+ else:
+ try:
+ df.open()
+ except exceptions.DiskFileDeleted as err:
+ want = err.timestamp < timestamp
+ except exceptions.DiskFileError as err:
+ want = True
+ else:
+ want = df.timestamp < timestamp
+ if want:
+ return urllib.quote(object_hash)
+ return None
+
+ # run the sync protocol...
+ func = 'swift.obj.ssync_receiver.Receiver._check_missing'
+ with mock.patch(func, _legacy_check_missing):
+ success, in_sync_objs = sender()
+
+ self.assertEqual(1, len(in_sync_objs))
+ self.assertTrue(success)
+
+ # verify protocol, expecting only a PUT to legacy receiver
+ results = self._analyze_trace(trace)
+ self.assertEqual(1, len(results['tx_missing']))
+ self.assertEqual(1, len(results['rx_missing']))
+ self.assertEqual(1, len(results['tx_updates']))
+ self.assertEqual('PUT', results['tx_updates'][0]['method'])
+ self.assertFalse(results['rx_updates'])
+
+ # verify on disk files...
+ rx_obj = self._open_rx_diskfile('o1', policy)
+ tx_obj = self._open_tx_diskfile('o1', policy)
+ # with legacy behavior rx_obj data and meta timestamps are equal
+ self.assertEqual(t2, rx_obj.data_timestamp)
+ self.assertEqual(t2, rx_obj.timestamp)
+ # with legacy behavior rx_obj data timestamp should equal tx_obj
+ self.assertEqual(rx_obj.data_timestamp, tx_obj.data_timestamp)
+ # tx meta file should not have been sync'd to rx data file
+ self.assertNotIn('X-Object-Meta-Test', rx_obj.get_metadata())
+
+
+class TestModuleMethods(unittest.TestCase):
+ def test_encode_missing(self):
+ object_hash = '9d41d8cd98f00b204e9800998ecf0abc'
+ ts_iter = make_timestamp_iter()
+ t_data = ts_iter.next()
+ t_meta = ts_iter.next()
+ d_meta_data = t_meta.raw - t_data.raw
+
+ # equal data and meta timestamps -> legacy single timestamp string
+ expected = '%s %s' % (object_hash, t_data.internal)
+ self.assertEqual(
+ expected,
+ ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_data))
+
+ # newer meta timestamp -> hex data delta encoded as extra message part
+ expected = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
+ self.assertEqual(
+ expected,
+ ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_meta))
+
+ # test encode and decode functions invert
+ expected = {'object_hash': object_hash, 'ts_meta': t_meta,
+ 'ts_data': t_data}
+ msg = ssync_sender.encode_missing(**expected)
+ actual = ssync_receiver.decode_missing(msg)
+ self.assertEqual(expected, actual)
+
+ def test_decode_wanted(self):
+ parts = ['d']
+ expected = {'data': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
+ parts = ['m']
+ expected = {'meta': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
+ parts = ['dm']
+ expected = {'data': True, 'meta': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
+ # you don't really these next few...
+ parts = ['md']
+ expected = {'data': True, 'meta': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
+ parts = ['xcy', 'funny', {'business': True}]
+ expected = {'data': True}
+ self.assertEqual(ssync_sender.decode_wanted(parts), expected)
+
if __name__ == '__main__':
unittest.main()