diff options
Diffstat (limited to 'swift/container')
-rw-r--r-- | swift/container/backend.py | 70 | ||||
-rw-r--r-- | swift/container/server.py | 44 | ||||
-rw-r--r-- | swift/container/sharder.py | 17 |
3 files changed, 91 insertions, 40 deletions
diff --git a/swift/container/backend.py b/swift/container/backend.py index 0a18fe48f..bdf34f7d8 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -34,9 +34,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \ get_db_files, parse_db_filename, make_db_file_path, split_path, \ RESERVED_BYTE from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \ - zero_like, DatabaseAlreadyExists - -SQLITE_ARG_LIMIT = 999 + zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT DATADIR = 'containers' @@ -62,7 +60,7 @@ SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED, # tuples and vice-versa SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count', 'bytes_used', 'meta_timestamp', 'deleted', 'state', - 'state_timestamp', 'epoch') + 'state_timestamp', 'epoch', 'reported') POLICY_STAT_TABLE_CREATE = ''' CREATE TABLE policy_stat ( @@ -269,6 +267,7 @@ def merge_shards(shard_data, existing): if existing['timestamp'] < shard_data['timestamp']: # note that currently we do not roll forward any meta or state from # an item that was created at older time, newer created time trumps + shard_data['reported'] = 0 # reset the latch return True elif existing['timestamp'] > shard_data['timestamp']: return False @@ -285,6 +284,18 @@ def merge_shards(shard_data, existing): else: new_content = True + # We can latch the reported flag + if existing['reported'] and \ + existing['object_count'] == shard_data['object_count'] and \ + existing['bytes_used'] == shard_data['bytes_used'] and \ + existing['state'] == shard_data['state'] and \ + existing['epoch'] == shard_data['epoch']: + shard_data['reported'] = 1 + else: + shard_data.setdefault('reported', 0) + if shard_data['reported'] and not existing['reported']: + new_content = True + if (existing['state_timestamp'] == shard_data['state_timestamp'] and shard_data['state'] > existing['state']): new_content = True @@ -597,7 +608,8 @@ class ContainerBroker(DatabaseBroker): deleted INTEGER DEFAULT 0, state INTEGER, state_timestamp TEXT, - epoch TEXT + epoch TEXT, + reported INTEGER DEFAULT 0 ); """ % SHARD_RANGE_TABLE) @@ -1430,10 +1442,13 @@ class ContainerBroker(DatabaseBroker): # sqlite3.OperationalError: cannot start a transaction # within a transaction conn.rollback() - if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err): - raise - self.create_shard_range_table(conn) - return _really_merge_items(conn) + if 'no such column: reported' in str(err): + self._migrate_add_shard_range_reported(conn) + return _really_merge_items(conn) + if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): + self.create_shard_range_table(conn) + return _really_merge_items(conn) + raise def get_reconciler_sync(self): with self.get() as conn: @@ -1581,9 +1596,20 @@ class ContainerBroker(DatabaseBroker): CONTAINER_STAT_VIEW_SCRIPT + 'COMMIT;') - def _reclaim(self, conn, age_timestamp, sync_timestamp): - super(ContainerBroker, self)._reclaim(conn, age_timestamp, - sync_timestamp) + def _migrate_add_shard_range_reported(self, conn): + """ + Add the reported column to the 'shard_range' table. + """ + conn.executescript(''' + BEGIN; + ALTER TABLE %s + ADD COLUMN reported INTEGER DEFAULT 0; + COMMIT; + ''' % SHARD_RANGE_TABLE) + + def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp): + super(ContainerBroker, self)._reclaim_other_stuff( + conn, age_timestamp, sync_timestamp) # populate instance cache, but use existing conn to avoid deadlock # when it has a pending update self._populate_instance_cache(conn=conn) @@ -1630,7 +1656,7 @@ class ContainerBroker(DatabaseBroker): elif states is not None: included_states.add(states) - def do_query(conn): + def do_query(conn, use_reported_column=True): condition = '' conditions = [] params = [] @@ -1648,21 +1674,27 @@ class ContainerBroker(DatabaseBroker): params.append(self.path) if conditions: condition = ' WHERE ' + ' AND '.join(conditions) + if use_reported_column: + columns = SHARD_RANGE_KEYS + else: + columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', ) sql = ''' SELECT %s FROM %s%s; - ''' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, condition) + ''' % (', '.join(columns), SHARD_RANGE_TABLE, condition) data = conn.execute(sql, params) data.row_factory = None return [row for row in data] - try: - with self.maybe_get(connection) as conn: + with self.maybe_get(connection) as conn: + try: return do_query(conn) - except sqlite3.OperationalError as err: - if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err): + except sqlite3.OperationalError as err: + if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): + return [] + if 'no such column: reported' in str(err): + return do_query(conn, use_reported_column=False) raise - return [] @classmethod def resolve_shard_range_states(cls, states): diff --git a/swift/container/server.py b/swift/container/server.py index c8d7647aa..db9ac0291 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -155,6 +155,8 @@ class ContainerController(BaseStorageServer): conf['auto_create_account_prefix'] else: self.auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX + self.shards_account_prefix = ( + self.auto_create_account_prefix + 'shards_') if config_true_value(conf.get('allow_versions', 'f')): self.save_headers.append('x-versions-location') if 'allow_versions' in conf: @@ -375,14 +377,12 @@ class ContainerController(BaseStorageServer): # auto create accounts) obj_policy_index = self.get_and_validate_policy_index(req) or 0 broker = self._get_container_broker(drive, part, account, container) - if account.startswith(self.auto_create_account_prefix) and obj and \ - not os.path.exists(broker.db_file): - try: - broker.initialize(req_timestamp.internal, obj_policy_index) - except DatabaseAlreadyExists: - pass - if not os.path.exists(broker.db_file): + if obj: + self._maybe_autocreate(broker, req_timestamp, account, + obj_policy_index, req) + elif not os.path.exists(broker.db_file): return HTTPNotFound() + if obj: # delete object # redirect if a shard range exists for the object name redirect = self._redirect_to_shard(req, broker, obj) @@ -449,11 +449,25 @@ class ContainerController(BaseStorageServer): broker.update_status_changed_at(timestamp) return recreated + def _should_autocreate(self, account, req): + auto_create_header = req.headers.get('X-Backend-Auto-Create') + if auto_create_header: + # If the caller included an explicit X-Backend-Auto-Create header, + # assume they know the behavior they want + return config_true_value(auto_create_header) + if account.startswith(self.shards_account_prefix): + # we have to specical case this subset of the + # auto_create_account_prefix because we don't want the updater + # accidently auto-creating shards; only the sharder creates + # shards and it will explicitly tell the server to do so + return False + return account.startswith(self.auto_create_account_prefix) + def _maybe_autocreate(self, broker, req_timestamp, account, - policy_index): + policy_index, req): created = False - if account.startswith(self.auto_create_account_prefix) and \ - not os.path.exists(broker.db_file): + should_autocreate = self._should_autocreate(account, req) + if should_autocreate and not os.path.exists(broker.db_file): if policy_index is None: raise HTTPBadRequest( 'X-Backend-Storage-Policy-Index header is required') @@ -506,8 +520,8 @@ class ContainerController(BaseStorageServer): # obj put expects the policy_index header, default is for # legacy support during upgrade. obj_policy_index = requested_policy_index or 0 - self._maybe_autocreate(broker, req_timestamp, account, - obj_policy_index) + self._maybe_autocreate( + broker, req_timestamp, account, obj_policy_index, req) # redirect if a shard exists for this object name response = self._redirect_to_shard(req, broker, obj) if response: @@ -531,8 +545,8 @@ class ContainerController(BaseStorageServer): for sr in json.loads(req.body)] except (ValueError, KeyError, TypeError) as err: return HTTPBadRequest('Invalid body: %r' % err) - created = self._maybe_autocreate(broker, req_timestamp, account, - requested_policy_index) + created = self._maybe_autocreate( + broker, req_timestamp, account, requested_policy_index, req) self._update_metadata(req, broker, req_timestamp, 'PUT') if shard_ranges: # TODO: consider writing the shard ranges into the pending @@ -805,7 +819,7 @@ class ContainerController(BaseStorageServer): requested_policy_index = self.get_and_validate_policy_index(req) broker = self._get_container_broker(drive, part, account, container) self._maybe_autocreate(broker, req_timestamp, account, - requested_policy_index) + requested_policy_index, req) try: objs = json.load(req.environ['wsgi.input']) except ValueError as err: diff --git a/swift/container/sharder.py b/swift/container/sharder.py index d9aa7c66d..dd33043ae 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -618,7 +618,8 @@ class ContainerSharder(ContainerReplicator): def _send_shard_ranges(self, account, container, shard_ranges, headers=None): - body = json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii') + body = json.dumps([dict(sr, reported=0) + for sr in shard_ranges]).encode('ascii') part, nodes = self.ring.get_nodes(account, container) headers = headers or {} headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD, @@ -1148,7 +1149,8 @@ class ContainerSharder(ContainerReplicator): 'X-Backend-Storage-Policy-Index': broker.storage_policy_index, 'X-Container-Sysmeta-Shard-Quoted-Root': quote( broker.root_path), - 'X-Container-Sysmeta-Sharding': True} + 'X-Container-Sysmeta-Sharding': 'True', + 'X-Backend-Auto-Create': 'True'} # NB: we *used* to send along # 'X-Container-Sysmeta-Shard-Root': broker.root_path # but that isn't safe for container names with nulls or newlines @@ -1468,7 +1470,7 @@ class ContainerSharder(ContainerReplicator): def _update_root_container(self, broker): own_shard_range = broker.get_own_shard_range(no_default=True) - if not own_shard_range: + if not own_shard_range or own_shard_range.reported: return # persist the reported shard metadata @@ -1478,9 +1480,12 @@ class ContainerSharder(ContainerReplicator): include_own=True, include_deleted=True) # send everything - self._send_shard_ranges( - broker.root_account, broker.root_container, - shard_ranges) + if self._send_shard_ranges( + broker.root_account, broker.root_container, shard_ranges): + # on success, mark ourselves as reported so we don't keep + # hammering the root + own_shard_range.reported = True + broker.merge_shard_ranges(own_shard_range) def _process_broker(self, broker, node, part): broker.get_info() # make sure account/container are populated |