summaryrefslogtreecommitdiff
path: root/swift/obj/ssync_receiver.py
diff options
context:
space:
mode:
authorpaul luse <paul.e.luse@intel.com>2014-10-28 09:51:06 -0700
committerClay Gerrard <clay.gerrard@gmail.com>2015-04-14 00:52:17 -0700
commit647b66a2ce4c85c43dcca49776d35c5ebb9cf15e (patch)
tree4dd192eb498e4cb47a779a80936c2668c0edeacb /swift/obj/ssync_receiver.py
parentb2189ef47ae08c39c348e7f4c90697ecb9ba64f9 (diff)
downloadswift-647b66a2ce4c85c43dcca49776d35c5ebb9cf15e.tar.gz
Erasure Code Reconstructor
This patch adds the erasure code reconstructor. It follows the design of the replicator but: - There is no notion of update() or update_deleted(). - There is a single job processor - Jobs are processed partition by partition. - At the end of processing a rebalanced or handoff partition, the reconstructor will remove successfully reverted objects if any. And various ssync changes such as the addition of reconstruct_fa() function called from ssync_sender which performs the actual reconstruction while sending the object to the receiver Co-Authored-By: Alistair Coles <alistair.coles@hp.com> Co-Authored-By: Thiago da Silva <thiago@redhat.com> Co-Authored-By: John Dickinson <me@not.mn> Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Co-Authored-By: Tushar Gohad <tushar.gohad@intel.com> Co-Authored-By: Samuel Merritt <sam@swiftstack.com> Co-Authored-By: Christian Schwede <christian.schwede@enovance.com> Co-Authored-By: Yuan Zhou <yuan.zhou@intel.com> blueprint ec-reconstructor Change-Id: I7d15620dc66ee646b223bb9fff700796cd6bef51
Diffstat (limited to 'swift/obj/ssync_receiver.py')
-rw-r--r--swift/obj/ssync_receiver.py37
1 files changed, 21 insertions, 16 deletions
diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py
index 99495cd48..b636a1624 100644
--- a/swift/obj/ssync_receiver.py
+++ b/swift/obj/ssync_receiver.py
@@ -29,23 +29,23 @@ from swift.common import request_helpers
class Receiver(object):
"""
- Handles incoming REPLICATION requests to the object server.
+ Handles incoming SSYNC requests to the object server.
These requests come from the object-replicator daemon that uses
:py:mod:`.ssync_sender`.
- The number of concurrent REPLICATION requests is restricted by
+ The number of concurrent SSYNC requests is restricted by
use of a replication_semaphore and can be configured with the
object-server.conf [object-server] replication_concurrency
setting.
- A REPLICATION request is really just an HTTP conduit for
+ An SSYNC request is really just an HTTP conduit for
sender/receiver replication communication. The overall
- REPLICATION request should always succeed, but it will contain
+ SSYNC request should always succeed, but it will contain
multiple requests within its request and response bodies. This
"hack" is done so that replication concurrency can be managed.
- The general process inside a REPLICATION request is:
+ The general process inside an SSYNC request is:
1. Initialize the request: Basic request validation, mount check,
acquire semaphore lock, etc..
@@ -73,10 +73,10 @@ class Receiver(object):
def __call__(self):
"""
- Processes a REPLICATION request.
+ Processes an SSYNC request.
Acquires a semaphore lock and then proceeds through the steps
- of the REPLICATION process.
+ of the SSYNC process.
"""
# The general theme for functions __call__ calls is that they should
# raise exceptions.MessageTimeout for client timeouts (logged locally),
@@ -89,7 +89,7 @@ class Receiver(object):
try:
# Double try blocks in case our main error handlers fail.
try:
- # intialize_request is for preamble items that can be done
+ # initialize_request is for preamble items that can be done
# outside a replication semaphore lock.
for data in self.initialize_request():
yield data
@@ -112,7 +112,7 @@ class Receiver(object):
self.app.replication_semaphore.release()
except exceptions.ReplicationLockTimeout as err:
self.app.logger.debug(
- '%s/%s/%s REPLICATION LOCK TIMEOUT: %s' % (
+ '%s/%s/%s SSYNC LOCK TIMEOUT: %s' % (
self.request.remote_addr, self.device, self.partition,
err))
yield ':ERROR: %d %r\n' % (0, str(err))
@@ -169,8 +169,11 @@ class Receiver(object):
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
self.device, self.partition, self.policy = \
request_helpers.get_name_and_placement(self.request, 2, 2, False)
- self.policy_idx = \
- int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0))
+ if 'X-Backend-Ssync-Frag-Index' in self.request.headers:
+ self.frag_index = int(
+ self.request.headers['X-Backend-Ssync-Frag-Index'])
+ else:
+ self.frag_index = None
utils.validate_device_partition(self.device, self.partition)
self.diskfile_mgr = self.app._diskfile_router[self.policy]
if self.diskfile_mgr.mount_check and not constraints.check_mount(
@@ -183,7 +186,7 @@ class Receiver(object):
def missing_check(self):
"""
Handles the receiver-side of the MISSING_CHECK step of a
- REPLICATION request.
+ SSYNC request.
Receives a list of hashes and timestamps of object
information the sender can provide and responds with a list
@@ -227,11 +230,13 @@ class Receiver(object):
line = self.fp.readline(self.app.network_chunk_size)
if not line or line.strip() == ':MISSING_CHECK: END':
break
- object_hash, timestamp = [urllib.unquote(v) for v in line.split()]
+ parts = line.split()
+ object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]]
want = False
try:
df = self.diskfile_mgr.get_diskfile_from_hash(
- self.device, self.partition, object_hash, self.policy)
+ self.device, self.partition, object_hash, self.policy,
+ frag_index=self.frag_index)
except exceptions.DiskFileNotExist:
want = True
else:
@@ -254,7 +259,7 @@ class Receiver(object):
def updates(self):
"""
- Handles the UPDATES step of a REPLICATION request.
+ Handles the UPDATES step of an SSYNC request.
Receives a set of PUT and DELETE subrequests that will be
routed to the object server itself for processing. These
@@ -354,7 +359,7 @@ class Receiver(object):
subreq_iter())
else:
raise Exception('Invalid subrequest method %s' % method)
- subreq.headers['X-Backend-Storage-Policy-Index'] = self.policy_idx
+ subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
subreq.headers['X-Backend-Replication'] = 'True'
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \