diff options
Diffstat (limited to 'swift/obj/ssync_receiver.py')
-rw-r--r-- | swift/obj/ssync_receiver.py | 37 |
1 files changed, 21 insertions, 16 deletions
diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 99495cd48..b636a1624 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -29,23 +29,23 @@ from swift.common import request_helpers class Receiver(object): """ - Handles incoming REPLICATION requests to the object server. + Handles incoming SSYNC requests to the object server. These requests come from the object-replicator daemon that uses :py:mod:`.ssync_sender`. - The number of concurrent REPLICATION requests is restricted by + The number of concurrent SSYNC requests is restricted by use of a replication_semaphore and can be configured with the object-server.conf [object-server] replication_concurrency setting. - A REPLICATION request is really just an HTTP conduit for + An SSYNC request is really just an HTTP conduit for sender/receiver replication communication. The overall - REPLICATION request should always succeed, but it will contain + SSYNC request should always succeed, but it will contain multiple requests within its request and response bodies. This "hack" is done so that replication concurrency can be managed. - The general process inside a REPLICATION request is: + The general process inside an SSYNC request is: 1. Initialize the request: Basic request validation, mount check, acquire semaphore lock, etc.. @@ -73,10 +73,10 @@ class Receiver(object): def __call__(self): """ - Processes a REPLICATION request. + Processes an SSYNC request. Acquires a semaphore lock and then proceeds through the steps - of the REPLICATION process. + of the SSYNC process. """ # The general theme for functions __call__ calls is that they should # raise exceptions.MessageTimeout for client timeouts (logged locally), @@ -89,7 +89,7 @@ class Receiver(object): try: # Double try blocks in case our main error handlers fail. try: - # intialize_request is for preamble items that can be done + # initialize_request is for preamble items that can be done # outside a replication semaphore lock. for data in self.initialize_request(): yield data @@ -112,7 +112,7 @@ class Receiver(object): self.app.replication_semaphore.release() except exceptions.ReplicationLockTimeout as err: self.app.logger.debug( - '%s/%s/%s REPLICATION LOCK TIMEOUT: %s' % ( + '%s/%s/%s SSYNC LOCK TIMEOUT: %s' % ( self.request.remote_addr, self.device, self.partition, err)) yield ':ERROR: %d %r\n' % (0, str(err)) @@ -169,8 +169,11 @@ class Receiver(object): self.request.environ['eventlet.minimum_write_chunk_size'] = 0 self.device, self.partition, self.policy = \ request_helpers.get_name_and_placement(self.request, 2, 2, False) - self.policy_idx = \ - int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0)) + if 'X-Backend-Ssync-Frag-Index' in self.request.headers: + self.frag_index = int( + self.request.headers['X-Backend-Ssync-Frag-Index']) + else: + self.frag_index = None utils.validate_device_partition(self.device, self.partition) self.diskfile_mgr = self.app._diskfile_router[self.policy] if self.diskfile_mgr.mount_check and not constraints.check_mount( @@ -183,7 +186,7 @@ class Receiver(object): def missing_check(self): """ Handles the receiver-side of the MISSING_CHECK step of a - REPLICATION request. + SSYNC request. Receives a list of hashes and timestamps of object information the sender can provide and responds with a list @@ -227,11 +230,13 @@ class Receiver(object): line = self.fp.readline(self.app.network_chunk_size) if not line or line.strip() == ':MISSING_CHECK: END': break - object_hash, timestamp = [urllib.unquote(v) for v in line.split()] + parts = line.split() + object_hash, timestamp = [urllib.unquote(v) for v in parts[:2]] want = False try: df = self.diskfile_mgr.get_diskfile_from_hash( - self.device, self.partition, object_hash, self.policy) + self.device, self.partition, object_hash, self.policy, + frag_index=self.frag_index) except exceptions.DiskFileNotExist: want = True else: @@ -254,7 +259,7 @@ class Receiver(object): def updates(self): """ - Handles the UPDATES step of a REPLICATION request. + Handles the UPDATES step of an SSYNC request. Receives a set of PUT and DELETE subrequests that will be routed to the object server itself for processing. These @@ -354,7 +359,7 @@ class Receiver(object): subreq_iter()) else: raise Exception('Invalid subrequest method %s' % method) - subreq.headers['X-Backend-Storage-Policy-Index'] = self.policy_idx + subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy) subreq.headers['X-Backend-Replication'] = 'True' if replication_headers: subreq.headers['X-Backend-Replication-Headers'] = \ |