# Copyright (c) 2010-2012 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Pluggable Back-ends for Container Server """ import errno import os from uuid import uuid4 import six from six.moves import range from six.moves.urllib.parse import unquote import sqlite3 from eventlet import tpool from swift.common.constraints import CONTAINER_LISTING_LIMIT from swift.common.exceptions import LockTimeout from swift.common.utils import Timestamp, encode_timestamps, \ decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \ ShardRange, renamer, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \ parse_db_filename, make_db_file_path, split_path, RESERVED_BYTE, \ filter_namespaces, ShardRangeList from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \ zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT DATADIR = 'containers' RECORD_TYPE_OBJECT = 'object' RECORD_TYPE_SHARD = 'shard' SHARD_RANGE_TABLE = 'shard_range' NOTFOUND = 'not_found' UNSHARDED = 'unsharded' SHARDING = 'sharding' SHARDED = 'sharded' COLLAPSED = 'collapsed' SHARD_STATS_STATES = [ShardRange.ACTIVE, ShardRange.SHARDING, ShardRange.SHRINKING] SHARD_LISTING_STATES = SHARD_STATS_STATES + [ShardRange.CLEAVED] SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE, ShardRange.SHARDING] # when auditing a shard gets its own shard range, which could be in any state # except FOUND, and any potential acceptors excluding FOUND ranges that may be # unwanted overlaps SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE, ShardRange.SHARDING, ShardRange.SHARDED, ShardRange.SHRINKING, ShardRange.SHRUNK] # shard's may not be fully populated while in the FOUND and CREATED # state, so shards should only update their own shard range's object # stats when they are in the following states SHARD_UPDATE_STAT_STATES = [ShardRange.CLEAVED, ShardRange.ACTIVE, ShardRange.SHARDING, ShardRange.SHARDED, ShardRange.SHRINKING, ShardRange.SHRUNK] # attribute names in order used when transforming shard ranges from dicts to # tuples and vice-versa SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count', 'bytes_used', 'meta_timestamp', 'deleted', 'state', 'state_timestamp', 'epoch', 'reported', 'tombstones') POLICY_STAT_TABLE_CREATE = ''' CREATE TABLE policy_stat ( storage_policy_index INTEGER PRIMARY KEY, object_count INTEGER DEFAULT 0, bytes_used INTEGER DEFAULT 0 ); ''' POLICY_STAT_TRIGGER_SCRIPT = ''' CREATE TRIGGER object_insert_policy_stat AFTER INSERT ON object BEGIN UPDATE policy_stat SET object_count = object_count + (1 - new.deleted), bytes_used = bytes_used + new.size WHERE storage_policy_index = new.storage_policy_index; INSERT INTO policy_stat ( storage_policy_index, object_count, bytes_used) SELECT new.storage_policy_index, (1 - new.deleted), new.size WHERE NOT EXISTS( SELECT changes() as change FROM policy_stat WHERE change <> 0 ); UPDATE container_info SET hash = chexor(hash, new.name, new.created_at); END; CREATE TRIGGER object_delete_policy_stat AFTER DELETE ON object BEGIN UPDATE policy_stat SET object_count = object_count - (1 - old.deleted), bytes_used = bytes_used - old.size WHERE storage_policy_index = old.storage_policy_index; UPDATE container_info SET hash = chexor(hash, old.name, old.created_at); END; ''' CONTAINER_INFO_TABLE_SCRIPT = ''' CREATE TABLE container_info ( account TEXT, container TEXT, created_at TEXT, put_timestamp TEXT DEFAULT '0', delete_timestamp TEXT DEFAULT '0', reported_put_timestamp TEXT DEFAULT '0', reported_delete_timestamp TEXT DEFAULT '0', reported_object_count INTEGER DEFAULT 0, reported_bytes_used INTEGER DEFAULT 0, hash TEXT default '00000000000000000000000000000000', id TEXT, status TEXT DEFAULT '', status_changed_at TEXT DEFAULT '0', metadata TEXT DEFAULT '', x_container_sync_point1 INTEGER DEFAULT -1, x_container_sync_point2 INTEGER DEFAULT -1, storage_policy_index INTEGER DEFAULT 0, reconciler_sync_point INTEGER DEFAULT -1 ); ''' CONTAINER_STAT_VIEW_SCRIPT = ''' CREATE VIEW container_stat AS SELECT ci.account, ci.container, ci.created_at, ci.put_timestamp, ci.delete_timestamp, ci.reported_put_timestamp, ci.reported_delete_timestamp, ci.reported_object_count, ci.reported_bytes_used, ci.hash, ci.id, ci.status, ci.status_changed_at, ci.metadata, ci.x_container_sync_point1, ci.x_container_sync_point2, ci.reconciler_sync_point, ci.storage_policy_index, coalesce(ps.object_count, 0) AS object_count, coalesce(ps.bytes_used, 0) AS bytes_used FROM container_info ci LEFT JOIN policy_stat ps ON ci.storage_policy_index = ps.storage_policy_index; CREATE TRIGGER container_stat_update INSTEAD OF UPDATE ON container_stat BEGIN UPDATE container_info SET account = NEW.account, container = NEW.container, created_at = NEW.created_at, put_timestamp = NEW.put_timestamp, delete_timestamp = NEW.delete_timestamp, reported_put_timestamp = NEW.reported_put_timestamp, reported_delete_timestamp = NEW.reported_delete_timestamp, reported_object_count = NEW.reported_object_count, reported_bytes_used = NEW.reported_bytes_used, hash = NEW.hash, id = NEW.id, status = NEW.status, status_changed_at = NEW.status_changed_at, metadata = NEW.metadata, x_container_sync_point1 = NEW.x_container_sync_point1, x_container_sync_point2 = NEW.x_container_sync_point2, storage_policy_index = NEW.storage_policy_index, reconciler_sync_point = NEW.reconciler_sync_point; END; ''' def update_new_item_from_existing(new_item, existing): """ Compare the data and meta related timestamps of a new object item with the timestamps of an existing object record, and update the new item with data and/or meta related attributes from the existing record if their timestamps are newer. The multiple timestamps are encoded into a single string for storing in the 'created_at' column of the objects db table. :param new_item: A dict of object update attributes :param existing: A dict of existing object attributes :return: True if any attributes of the new item dict were found to be newer than the existing and therefore not updated, otherwise False implying that the updated item is equal to the existing. """ # item[created_at] may be updated so keep a copy of the original # value in case we process this item again new_item.setdefault('data_timestamp', new_item['created_at']) # content-type and metadata timestamps may be encoded in # item[created_at], or may be set explicitly. item_ts_data, item_ts_ctype, item_ts_meta = decode_timestamps( new_item['data_timestamp']) if new_item.get('ctype_timestamp'): item_ts_ctype = Timestamp(new_item.get('ctype_timestamp')) item_ts_meta = item_ts_ctype if new_item.get('meta_timestamp'): item_ts_meta = Timestamp(new_item.get('meta_timestamp')) if not existing: # encode new_item timestamps into one string for db record new_item['created_at'] = encode_timestamps( item_ts_data, item_ts_ctype, item_ts_meta) return True # decode existing timestamp into separate data, content-type and # metadata timestamps rec_ts_data, rec_ts_ctype, rec_ts_meta = decode_timestamps( existing['created_at']) # Extract any swift_bytes values from the content_type values. This is # necessary because the swift_bytes value to persist should be that at the # most recent data timestamp whereas the content-type value to persist is # that at the most recent content-type timestamp. The two values happen to # be stored in the same database column for historical reasons. for item in (new_item, existing): content_type, swift_bytes = extract_swift_bytes(item['content_type']) item['content_type'] = content_type item['swift_bytes'] = swift_bytes newer_than_existing = [True, True, True] if rec_ts_data >= item_ts_data: # apply data attributes from existing record new_item.update([(k, existing[k]) for k in ('size', 'etag', 'deleted', 'swift_bytes')]) item_ts_data = rec_ts_data newer_than_existing[0] = False if rec_ts_ctype >= item_ts_ctype: # apply content-type attribute from existing record new_item['content_type'] = existing['content_type'] item_ts_ctype = rec_ts_ctype newer_than_existing[1] = False if rec_ts_meta >= item_ts_meta: # apply metadata timestamp from existing record item_ts_meta = rec_ts_meta newer_than_existing[2] = False # encode updated timestamps into one string for db record new_item['created_at'] = encode_timestamps( item_ts_data, item_ts_ctype, item_ts_meta) # append the most recent swift_bytes onto the most recent content_type in # new_item and restore existing to its original state for item in (new_item, existing): if item['swift_bytes']: item['content_type'] += ';swift_bytes=%s' % item['swift_bytes'] del item['swift_bytes'] return any(newer_than_existing) def merge_shards(shard_data, existing): """ Compares ``shard_data`` with ``existing`` and updates ``shard_data`` with any items of ``existing`` that take precedence over the corresponding item in ``shard_data``. :param shard_data: a dict representation of shard range that may be modified by this method. :param existing: a dict representation of shard range. :returns: True if ``shard data`` has any item(s) that are considered to take precedence over the corresponding item in ``existing`` """ if not existing: return True if existing['timestamp'] < shard_data['timestamp']: # note that currently we do not roll forward any meta or state from # an item that was created at older time, newer created time trumps shard_data['reported'] = 0 # reset the latch return True elif existing['timestamp'] > shard_data['timestamp']: return False new_content = False # timestamp must be the same, so preserve existing range bounds and deleted for k in ('lower', 'upper', 'deleted'): shard_data[k] = existing[k] # now we need to look for meta data updates if existing['meta_timestamp'] >= shard_data['meta_timestamp']: for k in ('object_count', 'bytes_used', 'meta_timestamp'): shard_data[k] = existing[k] shard_data['tombstones'] = existing.get('tombstones', -1) else: new_content = True # We can latch the reported flag if existing['reported'] and \ existing['object_count'] == shard_data['object_count'] and \ existing['bytes_used'] == shard_data['bytes_used'] and \ existing.get('tombstones', -1) == shard_data['tombstones'] and \ existing['state'] == shard_data['state'] and \ existing['epoch'] == shard_data['epoch']: shard_data['reported'] = 1 else: shard_data.setdefault('reported', 0) if shard_data['reported'] and not existing['reported']: new_content = True if (existing['state_timestamp'] == shard_data['state_timestamp'] and shard_data['state'] > existing['state']): new_content = True elif existing['state_timestamp'] >= shard_data['state_timestamp']: for k in ('state', 'state_timestamp', 'epoch'): shard_data[k] = existing[k] else: new_content = True return new_content def sift_shard_ranges(new_shard_ranges, existing_shard_ranges): """ Compares new and existing shard ranges, updating the new shard ranges with any more recent state from the existing, and returns shard ranges sorted into those that need adding because they contain new or updated state and those that need deleting because their state has been superseded. :param new_shard_ranges: a list of dicts, each of which represents a shard range. :param existing_shard_ranges: a dict mapping shard range names to dicts representing a shard range. :return: a tuple (to_add, to_delete); to_add is a list of dicts, each of which represents a shard range that is to be added to the existing shard ranges; to_delete is a set of shard range names that are to be deleted. """ to_delete = set() to_add = {} for item in new_shard_ranges: item_ident = item['name'] existing = existing_shard_ranges.get(item_ident) if merge_shards(item, existing): # exists with older timestamp if item_ident in existing_shard_ranges: to_delete.add(item_ident) # duplicate entries in item_list if (item_ident not in to_add or merge_shards(item, to_add[item_ident])): to_add[item_ident] = item return to_add.values(), to_delete class ContainerBroker(DatabaseBroker): """ Encapsulates working with a container database. Note that this may involve multiple on-disk DB files if the container becomes sharded: * :attr:`_db_file` is the path to the legacy container DB name, i.e. ``.db``. This file should exist for an initialised broker that has never been sharded, but will not exist once a container has been sharded. * :attr:`db_files` is a list of existing db files for the broker. This list should have at least one entry for an initialised broker, and should have two entries while a broker is in SHARDING state. * :attr:`db_file` is the path to whichever db is currently authoritative for the container. Depending on the container's state, this may not be the same as the ``db_file`` argument given to :meth:`~__init__`, unless ``force_db_file`` is True in which case :attr:`db_file` is always equal to the ``db_file`` argument given to :meth:`~__init__`. * :attr:`pending_file` is always equal to :attr:`_db_file` extended with ``.pending``, i.e. ``.db.pending``. """ db_type = 'container' db_contains_type = 'object' db_reclaim_timestamp = 'created_at' delete_meta_whitelist = ['x-container-sysmeta-shard-quoted-root', 'x-container-sysmeta-shard-root', 'x-container-sysmeta-sharding'] def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None, account=None, container=None, pending_timeout=None, stale_reads_ok=False, skip_commits=False, force_db_file=False): self._init_db_file = db_file base_db_file = make_db_file_path(db_file, None) super(ContainerBroker, self).__init__( base_db_file, timeout, logger, account, container, pending_timeout, stale_reads_ok, skip_commits=skip_commits) # the root account and container are populated on demand self._root_account = self._root_container = None self._force_db_file = force_db_file self._db_files = None @classmethod def create_broker(cls, device_path, part, account, container, logger=None, epoch=None, put_timestamp=None, storage_policy_index=None): """ Create a ContainerBroker instance. If the db doesn't exist, initialize the db file. :param device_path: device path :param part: partition number :param account: account name string :param container: container name string :param logger: a logger instance :param epoch: a timestamp to include in the db filename :param put_timestamp: initial timestamp if broker needs to be initialized :param storage_policy_index: the storage policy index :return: a tuple of (``broker``, ``initialized``) where ``broker`` is an instance of :class:`swift.container.backend.ContainerBroker` and ``initialized`` is True if the db file was initialized, False otherwise. """ hsh = hash_path(account, container) db_dir = storage_directory(DATADIR, part, hsh) db_path = make_db_file_path( os.path.join(device_path, db_dir, hsh + '.db'), epoch) broker = ContainerBroker(db_path, account=account, container=container, logger=logger) initialized = False if not os.path.exists(broker.db_file): try: broker.initialize(put_timestamp, storage_policy_index) initialized = True except DatabaseAlreadyExists: pass return broker, initialized def get_db_state(self): """ Returns the current state of on disk db files. """ if not self.db_files: return NOTFOUND if len(self.db_files) > 1: return SHARDING if self.db_epoch is None: # never been sharded return UNSHARDED if self.db_epoch != self.get_own_shard_range().epoch: return UNSHARDED if not self.get_shard_ranges(): return COLLAPSED return SHARDED def sharding_initiated(self): """ Returns True if a broker has shard range state that would be necessary for sharding to have been initiated, False otherwise. """ own_shard_range = self.get_own_shard_range() if own_shard_range.state in ShardRange.CLEAVING_STATES: return bool(self.get_shard_ranges()) return False def sharding_required(self): """ Returns True if a broker has shard range state that would be necessary for sharding to have been initiated but has not yet completed sharding, False otherwise. """ db_state = self.get_db_state() return (db_state == SHARDING or (db_state == UNSHARDED and self.sharding_initiated())) def is_sharded(self): return self.get_db_state() == SHARDED def reload_db_files(self): """ Reloads the cached list of valid on disk db files for this broker. """ # reset connection so the next access will use the correct DB file self.conn = None self._db_files = get_db_files(self._init_db_file) @property def db_files(self): """ Gets the cached list of valid db files that exist on disk for this broker. The cached list may be refreshed by calling :meth:`~swift.container.backend.ContainerBroker.reload_db_files`. :return: A list of paths to db files ordered by ascending epoch; the list may be empty. """ if not self._db_files: self.reload_db_files() return self._db_files @property def db_file(self): """ Get the path to the primary db file for this broker. This is typically the db file for the most recent sharding epoch. However, if no db files exist on disk, or if ``force_db_file`` was True when the broker was constructed, then the primary db file is the file passed to the broker constructor. :return: A path to a db file; the file does not necessarily exist. """ if self._force_db_file: return self._init_db_file if self.db_files: return self.db_files[-1] return self._init_db_file @property def db_epoch(self): hash_, epoch, ext = parse_db_filename(self.db_file) return epoch @property def storage_policy_index(self): if not hasattr(self, '_storage_policy_index'): self._storage_policy_index = \ self.get_info()['storage_policy_index'] return self._storage_policy_index @property def path(self): self._populate_instance_cache() return '%s/%s' % (self.account, self.container) def _initialize(self, conn, put_timestamp, storage_policy_index): """ Create a brand new container database (tables, indices, triggers, etc.) """ if not self.account: raise ValueError( 'Attempting to create a new database with no account set') if not self.container: raise ValueError( 'Attempting to create a new database with no container set') if storage_policy_index is None: storage_policy_index = 0 self.create_object_table(conn) self.create_policy_stat_table(conn, storage_policy_index) self.create_container_info_table(conn, put_timestamp, storage_policy_index) self.create_shard_range_table(conn) self._db_files = None def create_object_table(self, conn): """ Create the object table which is specific to the container DB. Not a part of Pluggable Back-ends, internal to the baseline code. :param conn: DB connection object """ conn.executescript(""" CREATE TABLE object ( ROWID INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, created_at TEXT, size INTEGER, content_type TEXT, etag TEXT, deleted INTEGER DEFAULT 0, storage_policy_index INTEGER DEFAULT 0 ); CREATE INDEX ix_object_deleted_name ON object (deleted, name); CREATE TRIGGER object_update BEFORE UPDATE ON object BEGIN SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); END; """ + POLICY_STAT_TRIGGER_SCRIPT) def create_container_info_table(self, conn, put_timestamp, storage_policy_index): """ Create the container_info table which is specific to the container DB. Not a part of Pluggable Back-ends, internal to the baseline code. Also creates the container_stat view. :param conn: DB connection object :param put_timestamp: put timestamp :param storage_policy_index: storage policy index """ if put_timestamp is None: put_timestamp = Timestamp(0).internal # The container_stat view is for compatibility; old versions of Swift # expected a container_stat table with columns "object_count" and # "bytes_used", but when that stuff became per-storage-policy and # moved to the policy_stat table, we stopped creating those columns in # container_stat. # # To retain compatibility, we create the container_stat view with some # triggers to make it behave like the old container_stat table. This # way, if an old version of Swift encounters a database with the new # schema, it can still work. # # Note that this can occur during a rolling Swift upgrade if a DB gets # rsynced from an old node to a new, so it's necessary for # availability during upgrades. The fact that it enables downgrades is # a nice bonus. conn.executescript(CONTAINER_INFO_TABLE_SCRIPT + CONTAINER_STAT_VIEW_SCRIPT) conn.execute(""" INSERT INTO container_info (account, container, created_at, id, put_timestamp, status_changed_at, storage_policy_index) VALUES (?, ?, ?, ?, ?, ?, ?); """, (self.account, self.container, Timestamp.now().internal, self._new_db_id(), put_timestamp, put_timestamp, storage_policy_index)) def create_policy_stat_table(self, conn, storage_policy_index=0): """ Create policy_stat table. :param conn: DB connection object :param storage_policy_index: the policy_index the container is being created with """ conn.executescript(POLICY_STAT_TABLE_CREATE) conn.execute(""" INSERT INTO policy_stat (storage_policy_index) VALUES (?) """, (storage_policy_index,)) def create_shard_range_table(self, conn): """ Create the shard_range table which is specific to the container DB. :param conn: DB connection object """ # Use execute (not executescript) so we get the benefits of our # GreenDBConnection. Creating a table requires a whole-DB lock; # *any* in-progress cursor will otherwise trip a "database is locked" # error. conn.execute(""" CREATE TABLE %s ( ROWID INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, timestamp TEXT, lower TEXT, upper TEXT, object_count INTEGER DEFAULT 0, bytes_used INTEGER DEFAULT 0, meta_timestamp TEXT, deleted INTEGER DEFAULT 0, state INTEGER, state_timestamp TEXT, epoch TEXT, reported INTEGER DEFAULT 0, tombstones INTEGER DEFAULT -1 ); """ % SHARD_RANGE_TABLE) conn.execute(""" CREATE TRIGGER shard_range_update BEFORE UPDATE ON %s BEGIN SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); END; """ % SHARD_RANGE_TABLE) def get_db_version(self, conn): if self._db_version == -1: self._db_version = 0 for row in conn.execute(''' SELECT name FROM sqlite_master WHERE name = 'ix_object_deleted_name' '''): self._db_version = 1 return self._db_version def _get_deleted_key(self, connection): if self.get_db_version(connection) < 1: return '+deleted' return 'deleted' def _newid(self, conn): conn.execute(''' UPDATE container_stat SET reported_put_timestamp = 0, reported_delete_timestamp = 0, reported_object_count = 0, reported_bytes_used = 0''') def _commit_puts_load(self, item_list, entry): """See :func:`swift.common.db.DatabaseBroker._commit_puts_load`""" (name, timestamp, size, content_type, etag, deleted) = entry[:6] if len(entry) > 6: storage_policy_index = entry[6] else: storage_policy_index = 0 content_type_timestamp = meta_timestamp = None if len(entry) > 7: content_type_timestamp = entry[7] if len(entry) > 8: meta_timestamp = entry[8] item_list.append({'name': name, 'created_at': timestamp, 'size': size, 'content_type': content_type, 'etag': etag, 'deleted': deleted, 'storage_policy_index': storage_policy_index, 'ctype_timestamp': content_type_timestamp, 'meta_timestamp': meta_timestamp}) def _empty(self): self._commit_puts_stale_ok() with self.get() as conn: try: row = conn.execute( 'SELECT max(object_count) from policy_stat').fetchone() except sqlite3.OperationalError as err: if not any(msg in str(err) for msg in ( "no such column: storage_policy_index", "no such table: policy_stat")): raise row = conn.execute( 'SELECT object_count from container_stat').fetchone() return zero_like(row[0]) def empty(self): """ Check if container DB is empty. This method uses more stringent checks on object count than :meth:`is_deleted`: this method checks that there are no objects in any policy; if the container is in the process of sharding then both fresh and retiring databases are checked to be empty; if a root container has shard ranges then they are checked to be empty. :returns: True if the database has no active objects, False otherwise """ if not all(broker._empty() for broker in self.get_brokers()): return False if self.is_root_container() and self.sharding_initiated(): # sharded shards don't get updates from their shards so their shard # usage should not be relied upon return self.get_shard_usage()['object_count'] <= 0 return True def delete_object(self, name, timestamp, storage_policy_index=0): """ Mark an object deleted. :param name: object name to be deleted :param timestamp: timestamp when the object was marked as deleted :param storage_policy_index: the storage policy index for the object """ self.put_object(name, timestamp, 0, 'application/deleted', 'noetag', deleted=1, storage_policy_index=storage_policy_index) def make_tuple_for_pickle(self, record): return (record['name'], record['created_at'], record['size'], record['content_type'], record['etag'], record['deleted'], record['storage_policy_index'], record['ctype_timestamp'], record['meta_timestamp']) def put_object(self, name, timestamp, size, content_type, etag, deleted=0, storage_policy_index=0, ctype_timestamp=None, meta_timestamp=None): """ Creates an object in the DB with its metadata. :param name: object name to be created :param timestamp: timestamp of when the object was created :param size: object size :param content_type: object content-type :param etag: object etag :param deleted: if True, marks the object as deleted and sets the deleted_at timestamp to timestamp :param storage_policy_index: the storage policy index for the object :param ctype_timestamp: timestamp of when content_type was last updated :param meta_timestamp: timestamp of when metadata was last updated """ record = {'name': name, 'created_at': timestamp, 'size': size, 'content_type': content_type, 'etag': etag, 'deleted': deleted, 'storage_policy_index': storage_policy_index, 'ctype_timestamp': ctype_timestamp, 'meta_timestamp': meta_timestamp} self.put_record(record) def remove_objects(self, lower, upper, max_row=None): """ Removes object records in the given namespace range from the object table. Note that objects are removed regardless of their storage_policy_index. :param lower: defines the lower bound of object names that will be removed; names greater than this value will be removed; names less than or equal to this value will not be removed. :param upper: defines the upper bound of object names that will be removed; names less than or equal to this value will be removed; names greater than this value will not be removed. The empty string is interpreted as there being no upper bound. :param max_row: if specified only rows less than or equal to max_row will be removed """ query_conditions = [] query_args = [] if max_row is not None: query_conditions.append('ROWID <= ?') query_args.append(str(max_row)) if lower: query_conditions.append('name > ?') query_args.append(lower) if upper: query_conditions.append('name <= ?') query_args.append(upper) query = 'DELETE FROM object WHERE deleted in (0, 1)' if query_conditions: query += ' AND ' + ' AND '.join(query_conditions) with self.get() as conn: conn.execute(query, query_args) conn.commit() def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp, **kwargs): """ Apply delete logic to database info. :returns: True if the DB is considered to be deleted, False otherwise """ # The container is considered deleted if the delete_timestamp # value is greater than the put_timestamp, and there are no # objects in the container. return zero_like(object_count) and ( Timestamp(delete_timestamp) > Timestamp(put_timestamp)) def _is_deleted(self, conn): """ Check if the DB is considered to be deleted. This object count used in this check is the same as the container object count that would be returned in the result of :meth:`get_info` and exposed to a client i.e. it is based on the container_stat view for the current storage policy index or relevant shard range usage. :param conn: database conn :returns: True if the DB is considered to be deleted, False otherwise """ info = conn.execute(''' SELECT put_timestamp, delete_timestamp, object_count FROM container_stat''').fetchone() info = dict(info) info.update(self._get_alternate_object_stats()[1]) return self._is_deleted_info(**info) def is_old_enough_to_reclaim(self, now, reclaim_age): with self.get() as conn: info = conn.execute(''' SELECT put_timestamp, delete_timestamp FROM container_stat''').fetchone() return (Timestamp(now - reclaim_age) > Timestamp(info['delete_timestamp']) > Timestamp(info['put_timestamp'])) def is_empty_enough_to_reclaim(self): if self.is_root_container() and (self.get_shard_ranges() or self.get_db_state() == SHARDING): return False return self.empty() def is_reclaimable(self, now, reclaim_age): return self.is_old_enough_to_reclaim(now, reclaim_age) and \ self.is_empty_enough_to_reclaim() def get_info_is_deleted(self): """ Get the is_deleted status and info for the container. :returns: a tuple, in the form (info, is_deleted) info is a dict as returned by get_info and is_deleted is a boolean. """ if not os.path.exists(self.db_file): return {}, True info = self.get_info() return info, self._is_deleted_info(**info) def get_replication_info(self): info = super(ContainerBroker, self).get_replication_info() info['shard_max_row'] = self.get_max_row(SHARD_RANGE_TABLE) return info def _do_get_info_query(self, conn): data = None trailing_sync = 'x_container_sync_point1, x_container_sync_point2' trailing_pol = 'storage_policy_index' errors = set() while not data: try: data = conn.execute((''' SELECT account, container, created_at, put_timestamp, delete_timestamp, status, status_changed_at, object_count, bytes_used, reported_put_timestamp, reported_delete_timestamp, reported_object_count, reported_bytes_used, hash, id, %s, %s FROM container_stat ''') % (trailing_sync, trailing_pol)).fetchone() except sqlite3.OperationalError as err: err_msg = str(err) if err_msg in errors: # only attempt migration once raise errors.add(err_msg) if 'no such column: storage_policy_index' in err_msg: trailing_pol = '0 AS storage_policy_index' elif 'no such column: x_container_sync_point' in err_msg: trailing_sync = '-1 AS x_container_sync_point1, ' \ '-1 AS x_container_sync_point2' else: raise data = dict(data) # populate instance cache self._storage_policy_index = data['storage_policy_index'] self.account = data['account'] self.container = data['container'] return data def _get_info(self): self._commit_puts_stale_ok() with self.get() as conn: return self._do_get_info_query(conn) def _populate_instance_cache(self, conn=None): # load cached instance attributes from the database if necessary if self.container is None: with self.maybe_get(conn) as conn: self._do_get_info_query(conn) def _get_alternate_object_stats(self): state = self.get_db_state() if state == SHARDING: other_info = self.get_brokers()[0]._get_info() stats = {'object_count': other_info['object_count'], 'bytes_used': other_info['bytes_used']} elif state == SHARDED and self.is_root_container(): stats = self.get_shard_usage() else: stats = {} return state, stats def get_info(self): """ Get global data for the container. :returns: dict with keys: account, container, created_at, put_timestamp, delete_timestamp, status, status_changed_at, object_count, bytes_used, reported_put_timestamp, reported_delete_timestamp, reported_object_count, reported_bytes_used, hash, id, x_container_sync_point1, x_container_sync_point2, and storage_policy_index, db_state. """ data = self._get_info() state, stats = self._get_alternate_object_stats() data.update(stats) data['db_state'] = state return data def set_x_container_sync_points(self, sync_point1, sync_point2): with self.get() as conn: try: self._set_x_container_sync_points(conn, sync_point1, sync_point2) except sqlite3.OperationalError as err: if 'no such column: x_container_sync_point' not in \ str(err): raise self._migrate_add_container_sync_points(conn) self._set_x_container_sync_points(conn, sync_point1, sync_point2) conn.commit() def _set_x_container_sync_points(self, conn, sync_point1, sync_point2): if sync_point1 is not None and sync_point2 is not None: conn.execute(''' UPDATE container_stat SET x_container_sync_point1 = ?, x_container_sync_point2 = ? ''', (sync_point1, sync_point2)) elif sync_point1 is not None: conn.execute(''' UPDATE container_stat SET x_container_sync_point1 = ? ''', (sync_point1,)) elif sync_point2 is not None: conn.execute(''' UPDATE container_stat SET x_container_sync_point2 = ? ''', (sync_point2,)) def get_policy_stats(self): with self.get() as conn: try: info = conn.execute(''' SELECT storage_policy_index, object_count, bytes_used FROM policy_stat ''').fetchall() except sqlite3.OperationalError as err: if not any(msg in str(err) for msg in ( "no such column: storage_policy_index", "no such table: policy_stat")): raise info = conn.execute(''' SELECT 0 as storage_policy_index, object_count, bytes_used FROM container_stat ''').fetchall() policy_stats = {} for row in info: stats = dict(row) key = stats.pop('storage_policy_index') policy_stats[key] = stats return policy_stats def has_multiple_policies(self): with self.get() as conn: try: curs = conn.execute(''' SELECT count(storage_policy_index) FROM policy_stat ''').fetchone() except sqlite3.OperationalError as err: if 'no such table: policy_stat' not in str(err): raise # no policy_stat row return False if curs and curs[0] > 1: return True # only one policy_stat row return False def set_storage_policy_index(self, policy_index, timestamp=None): """ Update the container_stat policy_index and status_changed_at. """ if timestamp is None: timestamp = Timestamp.now().internal def _setit(conn): conn.execute(''' INSERT OR IGNORE INTO policy_stat (storage_policy_index) VALUES (?) ''', (policy_index,)) conn.execute(''' UPDATE container_stat SET storage_policy_index = ?, status_changed_at = MAX(?, status_changed_at) WHERE storage_policy_index <> ? ''', (policy_index, timestamp, policy_index)) conn.commit() with self.get() as conn: try: _setit(conn) except sqlite3.OperationalError as err: if not any(msg in str(err) for msg in ( "no such column: storage_policy_index", "no such table: policy_stat")): raise self._migrate_add_storage_policy(conn) _setit(conn) self._storage_policy_index = policy_index def reported(self, put_timestamp, delete_timestamp, object_count, bytes_used): """ Update reported stats, available with container's `get_info`. :param put_timestamp: put_timestamp to update :param delete_timestamp: delete_timestamp to update :param object_count: object_count to update :param bytes_used: bytes_used to update """ with self.get() as conn: conn.execute(''' UPDATE container_stat SET reported_put_timestamp = ?, reported_delete_timestamp = ?, reported_object_count = ?, reported_bytes_used = ? ''', (put_timestamp, delete_timestamp, object_count, bytes_used)) conn.commit() def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter, path=None, storage_policy_index=0, reverse=False, include_deleted=False, since_row=None, transform_func=None, all_policies=False, allow_reserved=False): """ Get a list of objects sorted by name starting at marker onward, up to limit entries. Entries will begin with the prefix and will not have the delimiter after the prefix. :param limit: maximum number of entries to get :param marker: marker query :param end_marker: end marker query :param prefix: prefix query :param delimiter: delimiter for query :param path: if defined, will set the prefix and delimiter based on the path :param storage_policy_index: storage policy index for query :param reverse: reverse the result order. :param include_deleted: if True, include only deleted objects; if False (default), include only undeleted objects; otherwise, include both deleted and undeleted objects. :param since_row: include only items whose ROWID is greater than the given row id; by default all rows are included. :param transform_func: an optional function that if given will be called for each object to get a transformed version of the object to include in the listing; should have same signature as :meth:`~_transform_record`; defaults to :meth:`~_transform_record`. :param all_policies: if True, include objects for all storage policies ignoring any value given for ``storage_policy_index`` :param allow_reserved: exclude names with reserved-byte by default :returns: list of tuples of (name, created_at, size, content_type, etag, deleted) """ if include_deleted is True: deleted_arg = ' = 1' elif include_deleted is False: deleted_arg = ' = 0' else: deleted_arg = ' in (0, 1)' if transform_func is None: transform_func = self._transform_record delim_force_gte = False if six.PY2: (marker, end_marker, prefix, delimiter, path) = utf8encode( marker, end_marker, prefix, delimiter, path) self._commit_puts_stale_ok() if reverse: # Reverse the markers if we are reversing the listing. marker, end_marker = end_marker, marker if path is not None: prefix = path if path: prefix = path = path.rstrip('/') + '/' delimiter = '/' elif delimiter and not prefix: prefix = '' if prefix: end_prefix = prefix[:-1] + chr(ord(prefix[-1]) + 1) orig_marker = marker with self.get() as conn: results = [] deleted_key = self._get_deleted_key(conn) query_keys = ['name', 'created_at', 'size', 'content_type', 'etag', deleted_key] while len(results) < limit: query_args = [] query_conditions = [] if end_marker and (not prefix or end_marker < end_prefix): query_conditions.append('name < ?') query_args.append(end_marker) elif prefix: query_conditions.append('name < ?') query_args.append(end_prefix) if delim_force_gte: query_conditions.append('name >= ?') query_args.append(marker) # Always set back to False delim_force_gte = False elif marker and (not prefix or marker >= prefix): query_conditions.append('name > ?') query_args.append(marker) elif prefix: query_conditions.append('name >= ?') query_args.append(prefix) if not allow_reserved: query_conditions.append('name >= ?') query_args.append(chr(ord(RESERVED_BYTE) + 1)) query_conditions.append(deleted_key + deleted_arg) if since_row: query_conditions.append('ROWID > ?') query_args.append(since_row) def build_query(keys, conditions, args): query = 'SELECT ' + ', '.join(keys) + ' FROM object ' if conditions: query += 'WHERE ' + ' AND '.join(conditions) tail_query = ''' ORDER BY name %s LIMIT ? ''' % ('DESC' if reverse else '') return query + tail_query, args + [limit - len(results)] # storage policy filter if all_policies: query, args = build_query( query_keys + ['storage_policy_index'], query_conditions, query_args) else: query, args = build_query( query_keys + ['storage_policy_index'], query_conditions + ['storage_policy_index = ?'], query_args + [storage_policy_index]) try: curs = conn.execute(query, tuple(args)) except sqlite3.OperationalError as err: if 'no such column: storage_policy_index' not in str(err): raise query, args = build_query( query_keys + ['0 as storage_policy_index'], query_conditions, query_args) curs = conn.execute(query, tuple(args)) curs.row_factory = None # Delimiters without a prefix is ignored, further if there # is no delimiter then we can simply return the result as # prefixes are now handled in the SQL statement. if prefix is None or not delimiter: return [transform_func(r) for r in curs] # We have a delimiter and a prefix (possibly empty string) to # handle rowcount = 0 for row in curs: rowcount += 1 name = row[0] if reverse: end_marker = name else: marker = name if len(results) >= limit: curs.close() return results end = name.find(delimiter, len(prefix)) if path is not None: if name == path: continue if end >= 0 and len(name) > end + len(delimiter): if reverse: end_marker = name[:end + len(delimiter)] else: marker = ''.join([ name[:end], delimiter[:-1], chr(ord(delimiter[-1:]) + 1), ]) curs.close() break elif end >= 0: if reverse: end_marker = name[:end + len(delimiter)] else: marker = ''.join([ name[:end], delimiter[:-1], chr(ord(delimiter[-1:]) + 1), ]) # we want result to be inclusive of delim+1 delim_force_gte = True dir_name = name[:end + len(delimiter)] if dir_name != orig_marker: results.append([dir_name, '0', 0, None, '']) curs.close() break results.append(transform_func(row)) if not rowcount: break return results def get_objects(self, limit=None, marker='', end_marker='', include_deleted=None, since_row=None): """ Returns a list of objects, including deleted objects, in all policies. Each object in the list is described by a dict with keys {'name', 'created_at', 'size', 'content_type', 'etag', 'deleted', 'storage_policy_index'}. :param limit: maximum number of entries to get :param marker: if set, objects with names less than or equal to this value will not be included in the list. :param end_marker: if set, objects with names greater than or equal to this value will not be included in the list. :param include_deleted: if True, include only deleted objects; if False, include only undeleted objects; otherwise (default), include both deleted and undeleted objects. :param since_row: include only items whose ROWID is greater than the given row id; by default all rows are included. :return: a list of dicts, each describing an object. """ limit = CONTAINER_LISTING_LIMIT if limit is None else limit return self.list_objects_iter( limit, marker, end_marker, prefix=None, delimiter=None, path=None, reverse=False, include_deleted=include_deleted, transform_func=self._record_to_dict, since_row=since_row, all_policies=True, allow_reserved=True ) def _transform_record(self, record): """ Returns a tuple of (name, last-modified time, size, content_type and etag) for the given record. The given record's created_at timestamp is decoded into separate data, content-type and meta timestamps and the metadata timestamp is used as the last-modified time value. """ t_data, t_ctype, t_meta = decode_timestamps(record[1]) return (record[0], t_meta.internal) + record[2:5] def _record_to_dict(self, rec): if rec: keys = ('name', 'created_at', 'size', 'content_type', 'etag', 'deleted', 'storage_policy_index') return dict(zip(keys, rec)) return None def merge_items(self, item_list, source=None): """ Merge items into the object table. :param item_list: list of dictionaries of {'name', 'created_at', 'size', 'content_type', 'etag', 'deleted', 'storage_policy_index', 'ctype_timestamp', 'meta_timestamp'} :param source: if defined, update incoming_sync with the source """ for item in item_list: if six.PY2 and isinstance(item['name'], six.text_type): item['name'] = item['name'].encode('utf-8') elif not six.PY2 and isinstance(item['name'], six.binary_type): item['name'] = item['name'].decode('utf-8') def _really_really_merge_items(conn): curs = conn.cursor() if self.get_db_version(conn) >= 1: query_mod = ' deleted IN (0, 1) AND ' else: query_mod = '' curs.execute('BEGIN IMMEDIATE') # Get sqlite records for objects in item_list that already exist. # We must chunk it up to avoid sqlite's limit of 999 args. records = {} for offset in range(0, len(item_list), SQLITE_ARG_LIMIT): chunk = [rec['name'] for rec in item_list[offset:offset + SQLITE_ARG_LIMIT]] records.update( ((rec[0], rec[6]), rec) for rec in curs.execute( 'SELECT name, created_at, size, content_type,' 'etag, deleted, storage_policy_index ' 'FROM object WHERE ' + query_mod + ' name IN (%s)' % ','.join('?' * len(chunk)), chunk)) # Sort item_list into things that need adding and deleting, based # on results of created_at query. to_delete = set() to_add = {} for item in item_list: item.setdefault('storage_policy_index', 0) # legacy item_ident = (item['name'], item['storage_policy_index']) existing = self._record_to_dict(records.get(item_ident)) if update_new_item_from_existing(item, existing): if item_ident in records: # exists with older timestamp to_delete.add(item_ident) if item_ident in to_add: # duplicate entries in item_list update_new_item_from_existing(item, to_add[item_ident]) to_add[item_ident] = item if to_delete: curs.executemany( 'DELETE FROM object WHERE ' + query_mod + 'name=? AND storage_policy_index=?', (item_ident for item_ident in to_delete)) if to_add: curs.executemany( 'INSERT INTO object (name, created_at, size, content_type,' 'etag, deleted, storage_policy_index) ' 'VALUES (?, ?, ?, ?, ?, ?, ?)', ((rec['name'], rec['created_at'], rec['size'], rec['content_type'], rec['etag'], rec['deleted'], rec['storage_policy_index']) for rec in to_add.values())) if source: # for replication we rely on the remote end sending merges in # order with no gaps to increment sync_points sync_point = item_list[-1]['ROWID'] curs.execute(''' UPDATE incoming_sync SET sync_point=max(?, sync_point) WHERE remote_id=? ''', (sync_point, source)) if curs.rowcount < 1: curs.execute(''' INSERT INTO incoming_sync (sync_point, remote_id) VALUES (?, ?) ''', (sync_point, source)) conn.commit() def _really_merge_items(conn): return tpool.execute(_really_really_merge_items, conn) with self.get() as conn: try: return _really_merge_items(conn) except sqlite3.OperationalError as err: if 'no such column: storage_policy_index' not in str(err): raise self._migrate_add_storage_policy(conn) return _really_merge_items(conn) def merge_shard_ranges(self, shard_ranges): """ Merge shard ranges into the shard range table. :param shard_ranges: a shard range or a list of shard ranges; each shard range should be an instance of :class:`~swift.common.utils.ShardRange` or a dict representation of a shard range having ``SHARD_RANGE_KEYS``. """ if not shard_ranges: return if not isinstance(shard_ranges, (list, ShardRangeList)): shard_ranges = [shard_ranges] item_list = [] for item in shard_ranges: if isinstance(item, ShardRange): item = dict(item) for col in ('name', 'lower', 'upper'): if six.PY2 and isinstance(item[col], six.text_type): item[col] = item[col].encode('utf-8') elif not six.PY2 and isinstance(item[col], six.binary_type): item[col] = item[col].decode('utf-8') item_list.append(item) def _really_merge_items(conn): curs = conn.cursor() curs.execute('BEGIN IMMEDIATE') # Get rows for items that already exist. # We must chunk it up to avoid sqlite's limit of 999 args. records = {} for offset in range(0, len(item_list), SQLITE_ARG_LIMIT): chunk = [record['name'] for record in item_list[offset:offset + SQLITE_ARG_LIMIT]] records.update( (rec[0], dict(zip(SHARD_RANGE_KEYS, rec))) for rec in curs.execute( 'SELECT %s FROM %s ' 'WHERE deleted IN (0, 1) AND name IN (%s)' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, ','.join('?' * len(chunk))), chunk)) to_add, to_delete = sift_shard_ranges(item_list, records) if to_delete: curs.executemany( 'DELETE FROM %s WHERE deleted in (0, 1) ' 'AND name = ?' % SHARD_RANGE_TABLE, ((item_ident,) for item_ident in to_delete)) if to_add: vals = ','.join('?' * len(SHARD_RANGE_KEYS)) curs.executemany( 'INSERT INTO %s (%s) VALUES (%s)' % (SHARD_RANGE_TABLE, ','.join(SHARD_RANGE_KEYS), vals), tuple([item[k] for k in SHARD_RANGE_KEYS] for item in to_add)) conn.commit() migrations = { 'no such column: reported': self._migrate_add_shard_range_reported, 'no such column: tombstones': self._migrate_add_shard_range_tombstones, ('no such table: %s' % SHARD_RANGE_TABLE): self.create_shard_range_table, } migrations_done = set() with self.get() as conn: while True: try: return _really_merge_items(conn) except sqlite3.OperationalError as err: # Without the rollback, new enough (>= py37) python/sqlite3 # will panic: # sqlite3.OperationalError: cannot start a transaction # within a transaction conn.rollback() for err_str, migration in migrations.items(): if err_str in migrations_done: continue if err_str in str(err): migration(conn) migrations_done.add(err_str) break else: raise def get_reconciler_sync(self): with self.get() as conn: try: return conn.execute(''' SELECT reconciler_sync_point FROM container_stat ''').fetchone()[0] except sqlite3.OperationalError as err: if "no such column: reconciler_sync_point" not in str(err): raise return -1 def update_reconciler_sync(self, point): query = ''' UPDATE container_stat SET reconciler_sync_point = ? ''' with self.get() as conn: try: conn.execute(query, (point,)) except sqlite3.OperationalError as err: if "no such column: reconciler_sync_point" not in str(err): raise self._migrate_add_storage_policy(conn) conn.execute(query, (point,)) conn.commit() def get_misplaced_since(self, start, count): """ Get a list of objects which are in a storage policy different from the container's storage policy. :param start: last reconciler sync point :param count: maximum number of entries to get :returns: list of dicts with keys: name, created_at, size, content_type, etag, storage_policy_index """ qry = ''' SELECT ROWID, name, created_at, size, content_type, etag, deleted, storage_policy_index FROM object WHERE ROWID > ? AND storage_policy_index != ( SELECT storage_policy_index FROM container_stat LIMIT 1) ORDER BY ROWID ASC LIMIT ? ''' self._commit_puts_stale_ok() with self.get() as conn: try: cur = conn.execute(qry, (start, count)) except sqlite3.OperationalError as err: if "no such column: storage_policy_index" not in str(err): raise return [] return list(dict(row) for row in cur.fetchall()) def _migrate_add_container_sync_points(self, conn): """ Add the x_container_sync_point columns to the 'container_stat' table. """ conn.executescript(''' BEGIN; ALTER TABLE container_stat ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1; ALTER TABLE container_stat ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1; COMMIT; ''') def _migrate_add_storage_policy(self, conn): """ Migrate the container schema to support tracking objects from multiple storage policies. If the container_stat table has any pending migrations, they are applied now before copying into container_info. * create the 'policy_stat' table. * copy the current 'object_count' and 'bytes_used' columns to a row in the 'policy_stat' table. * add the storage_policy_index column to the 'object' table. * drop the 'object_insert' and 'object_delete' triggers. * add the 'object_insert_policy_stat' and 'object_delete_policy_stat' triggers. * create container_info table for non-policy container info * insert values from container_stat into container_info * drop container_stat table * create container_stat view """ # I tried just getting the list of column names in the current # container_stat table with a pragma table_info, but could never get # it inside the same transaction as the DDL (non-DML) statements: # https://docs.python.org/2/library/sqlite3.html # #controlling-transactions # So we just apply all pending migrations to container_stat and copy a # static known list of column names into container_info. try: self._migrate_add_container_sync_points(conn) except sqlite3.OperationalError as e: if 'duplicate column' in str(e): conn.execute('ROLLBACK;') else: raise try: conn.executescript(""" ALTER TABLE container_stat ADD COLUMN metadata TEXT DEFAULT ''; """) except sqlite3.OperationalError as e: if 'duplicate column' not in str(e): raise column_names = ', '.join(( 'account', 'container', 'created_at', 'put_timestamp', 'delete_timestamp', 'reported_put_timestamp', 'reported_object_count', 'reported_bytes_used', 'hash', 'id', 'status', 'status_changed_at', 'metadata', 'x_container_sync_point1', 'x_container_sync_point2')) conn.executescript( 'BEGIN;' + POLICY_STAT_TABLE_CREATE + ''' INSERT INTO policy_stat ( storage_policy_index, object_count, bytes_used) SELECT 0, object_count, bytes_used FROM container_stat; ALTER TABLE object ADD COLUMN storage_policy_index INTEGER DEFAULT 0; DROP TRIGGER object_insert; DROP TRIGGER object_delete; ''' + POLICY_STAT_TRIGGER_SCRIPT + CONTAINER_INFO_TABLE_SCRIPT + ''' INSERT INTO container_info (%s) SELECT %s FROM container_stat; DROP TABLE IF EXISTS container_stat; ''' % (column_names, column_names) + CONTAINER_STAT_VIEW_SCRIPT + 'COMMIT;') def _migrate_add_shard_range_reported(self, conn): """ Add the reported column to the 'shard_range' table. """ conn.executescript(''' BEGIN; ALTER TABLE %s ADD COLUMN reported INTEGER DEFAULT 0; COMMIT; ''' % SHARD_RANGE_TABLE) def _migrate_add_shard_range_tombstones(self, conn): """ Add the tombstones column to the 'shard_range' table. """ conn.executescript(''' BEGIN; ALTER TABLE %s ADD COLUMN tombstones INTEGER DEFAULT -1; COMMIT; ''' % SHARD_RANGE_TABLE) def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp): super(ContainerBroker, self)._reclaim_other_stuff( conn, age_timestamp, sync_timestamp) # populate instance cache, but use existing conn to avoid deadlock # when it has a pending update self._populate_instance_cache(conn=conn) try: conn.execute(''' DELETE FROM %s WHERE deleted = 1 AND timestamp < ? AND name != ? ''' % SHARD_RANGE_TABLE, (sync_timestamp, self.path)) except sqlite3.OperationalError as err: if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err): raise def _get_shard_range_rows(self, connection=None, includes=None, include_deleted=False, states=None, include_own=False, exclude_others=False): """ Returns a list of shard range rows. To get all shard ranges use ``include_own=True``. To get only the broker's own shard range use ``include_own=True`` and ``exclude_others=True``. :param connection: db connection :param includes: restricts the returned list to the shard range that includes the given value :param include_deleted: include rows marked as deleted :param states: include only rows matching the given state(s); can be an int or a list of ints. :param include_own: boolean that governs whether the row whose name matches the broker's path is included in the returned list. If True, that row is included, otherwise it is not included. Default is False. :param exclude_others: boolean that governs whether the rows whose names do not match the broker's path are included in the returned list. If True, those rows are not included, otherwise they are included. Default is False. :return: a list of tuples. """ if exclude_others and not include_own: return [] included_states = set() if isinstance(states, (list, tuple, set)): included_states.update(states) elif states is not None: included_states.add(states) # defaults to be used when legacy db's are missing columns default_values = {'reported': 0, 'tombstones': -1} def do_query(conn, defaults=None): condition = '' conditions = [] params = [] if not include_deleted: conditions.append('deleted=0') if included_states: conditions.append('state in (%s)' % ','.join( '?' * len(included_states))) params.extend(included_states) if not include_own: conditions.append('name != ?') params.append(self.path) if exclude_others: conditions.append('name = ?') params.append(self.path) if includes is not None: conditions.extend(('lower < ?', "(upper = '' OR upper >= ?)")) params.extend((includes, includes)) if conditions: condition = ' WHERE ' + ' AND '.join(conditions) columns = SHARD_RANGE_KEYS[:-2] for column in SHARD_RANGE_KEYS[-2:]: if column in defaults: columns += (('%s as %s' % (default_values[column], column)),) else: columns += (column,) sql = ''' SELECT %s FROM %s%s; ''' % (', '.join(columns), SHARD_RANGE_TABLE, condition) data = conn.execute(sql, params) data.row_factory = None return [row for row in data] with self.maybe_get(connection) as conn: defaults = set() attempts = len(default_values) + 1 while attempts: attempts -= 1 try: return do_query(conn, defaults) except sqlite3.OperationalError as err: if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): return [] if not attempts: raise new_defaults = set() for column in default_values.keys(): if 'no such column: %s' % column in str(err): new_defaults.add(column) if not new_defaults: raise if new_defaults.intersection(defaults): raise defaults.update(new_defaults) @classmethod def resolve_shard_range_states(cls, states): """ Given a list of values each of which may be the name of a state, the number of a state, or an alias, return the set of state numbers described by the list. The following alias values are supported: 'listing' maps to all states that are considered valid when listing objects; 'updating' maps to all states that are considered valid for redirecting an object update; 'auditing' maps to all states that are considered valid for a shard container that is updating its own shard range table from a root (this currently maps to all states except FOUND). :param states: a list of values each of which may be the name of a state, the number of a state, or an alias :return: a set of integer state numbers, or None if no states are given :raises ValueError: if any value in the given list is neither a valid state nor a valid alias """ if states: resolved_states = set() for state in states: if state == 'listing': resolved_states.update(SHARD_LISTING_STATES) elif state == 'updating': resolved_states.update(SHARD_UPDATE_STATES) elif state == 'auditing': resolved_states.update(SHARD_AUDITING_STATES) else: resolved_states.add(ShardRange.resolve_state(state)[0]) return resolved_states return None def get_shard_ranges(self, marker=None, end_marker=None, includes=None, reverse=False, include_deleted=False, states=None, include_own=False, exclude_others=False, fill_gaps=False): """ Returns a list of persisted shard ranges. :param marker: restricts the returned list to shard ranges whose namespace includes or is greater than the marker value. :param end_marker: restricts the returned list to shard ranges whose namespace includes or is less than the end_marker value. :param includes: restricts the returned list to the shard range that includes the given value; if ``includes`` is specified then ``marker`` and ``end_marker`` are ignored. :param reverse: reverse the result order. :param include_deleted: include items that have the delete marker set :param states: if specified, restricts the returned list to shard ranges that have the given state(s); can be a list of ints or a single int. :param include_own: boolean that governs whether the row whose name matches the broker's path is included in the returned list. If True, that row is included, otherwise it is not included. Default is False. :param exclude_others: boolean that governs whether the rows whose names do not match the broker's path are included in the returned list. If True, those rows are not included, otherwise they are included. Default is False. :param fill_gaps: if True, insert a modified copy of own shard range to fill any gap between the end of any found shard ranges and the upper bound of own shard range. Gaps enclosed within the found shard ranges are not filled. :return: a list of instances of :class:`swift.common.utils.ShardRange` """ if reverse: marker, end_marker = end_marker, marker if marker and end_marker and marker >= end_marker: return [] shard_ranges = [ ShardRange(*row) for row in self._get_shard_range_rows( includes=includes, include_deleted=include_deleted, states=states, include_own=include_own, exclude_others=exclude_others)] shard_ranges.sort(key=ShardRange.sort_key) if includes: return shard_ranges[:1] if shard_ranges else [] shard_ranges = filter_namespaces( shard_ranges, includes, marker, end_marker) if fill_gaps: own_shard_range = self.get_own_shard_range() if shard_ranges: last_upper = shard_ranges[-1].upper else: last_upper = max(marker or own_shard_range.lower, own_shard_range.lower) required_upper = min(end_marker or own_shard_range.upper, own_shard_range.upper) if required_upper > last_upper: filler_sr = own_shard_range filler_sr.lower = last_upper filler_sr.upper = required_upper shard_ranges.append(filler_sr) if reverse: shard_ranges.reverse() return shard_ranges def get_own_shard_range(self, no_default=False): """ Returns a shard range representing this broker's own shard range. If no such range has been persisted in the broker's shard ranges table then a default shard range representing the entire namespace will be returned. The ``object_count`` and ``bytes_used`` of the returned shard range are not guaranteed to be up-to-date with the current object stats for this broker. Callers that require up-to-date stats should use the ``get_info`` method. :param no_default: if True and the broker's own shard range is not found in the shard ranges table then None is returned, otherwise a default shard range is returned. :return: an instance of :class:`~swift.common.utils.ShardRange` """ shard_ranges = self.get_shard_ranges(include_own=True, include_deleted=True, exclude_others=True) if shard_ranges: own_shard_range = shard_ranges[0] elif no_default: return None else: own_shard_range = ShardRange( self.path, Timestamp.now(), ShardRange.MIN, ShardRange.MAX, state=ShardRange.ACTIVE) return own_shard_range def is_own_shard_range(self, shard_range): return shard_range.name == self.path def enable_sharding(self, epoch): """ Updates this broker's own shard range with the given epoch, sets its state to SHARDING and persists it in the DB. :param epoch: a :class:`~swift.utils.common.Timestamp` :return: the broker's updated own shard range. """ own_shard_range = self.get_own_shard_range() own_shard_range.update_state(ShardRange.SHARDING, epoch) own_shard_range.epoch = epoch self.merge_shard_ranges(own_shard_range) return own_shard_range def get_shard_usage(self): """ Get the aggregate object stats for all shard ranges in states ACTIVE, SHARDING or SHRINKING. :return: a dict with keys {bytes_used, object_count} """ shard_ranges = self.get_shard_ranges(states=SHARD_STATS_STATES) return {'bytes_used': sum(sr.bytes_used for sr in shard_ranges), 'object_count': sum(sr.object_count for sr in shard_ranges)} def get_all_shard_range_data(self): """ Returns a list of all shard range data, including own shard range and deleted shard ranges. :return: A list of dict representations of a ShardRange. """ shard_ranges = self.get_shard_ranges(include_deleted=True, include_own=True) return [dict(sr) for sr in shard_ranges] def set_sharding_state(self): """ Creates and initializes a fresh DB file in preparation for sharding a retiring DB. The broker's own shard range must have an epoch timestamp for this method to succeed. :return: True if the fresh DB was successfully created, False otherwise. """ epoch = self.get_own_shard_range().epoch if not epoch: self.logger.warning("Container '%s' cannot be set to sharding " "state: missing epoch", self.path) return False state = self.get_db_state() if not state == UNSHARDED: self.logger.warning("Container '%s' cannot be set to sharding " "state while in %s state", self.path, state) return False info = self.get_info() # The tmp_dir is cleaned up by the replicators after reclaim_age, so if # we initially create the fresh DB there, we will already have cleanup # covered if there is an error. tmp_dir = os.path.join(self.get_device_path(), 'tmp') if not os.path.exists(tmp_dir): mkdirs(tmp_dir) tmp_db_file = os.path.join(tmp_dir, "fresh%s.db" % str(uuid4())) fresh_broker = ContainerBroker(tmp_db_file, self.timeout, self.logger, self.account, self.container) fresh_broker.initialize(info['put_timestamp'], info['storage_policy_index']) # copy relevant data from the retiring db to the fresh db fresh_broker.update_metadata(self.metadata) fresh_broker.merge_shard_ranges(self.get_all_shard_range_data()) # copy sync points so that any peer in sync with retiring db will # appear to be in sync with the fresh db, although the peer shouldn't # attempt to replicate objects to a db with shard ranges. for incoming in (True, False): syncs = self.get_syncs(incoming) fresh_broker.merge_syncs(syncs, incoming) max_row = self.get_max_row() with fresh_broker.get() as fresh_broker_conn: # Initialise the rowid to continue from where the retiring db ended try: sql = "INSERT into object " \ "(ROWID, name, created_at, size, content_type, etag) " \ "values (?, 'tmp_sharding', ?, 0, '', ?)" fresh_broker_conn.execute( sql, (max_row, Timestamp.now().internal, MD5_OF_EMPTY_STRING)) fresh_broker_conn.execute( 'DELETE FROM object WHERE ROWID = ?', (max_row,)) fresh_broker_conn.commit() except sqlite3.OperationalError as err: self.logger.error( 'Failed to set the ROWID of the fresh database for %s: %s', self.path, err) return False # sync the retiring container stat into the fresh db. At least the # things that either aren't covered through the normal # broker api, and things that wont just be regenerated. try: sql = 'UPDATE container_stat SET created_at=?, ' sql += 'delete_timestamp=?, status=?, status_changed_at=?' sql_data = (info['created_at'], info['delete_timestamp'], info['status'], info['status_changed_at']) # 'reported_*' items are not sync'd because this is consistent # with when a new DB is created after rsync'ing to another # node (see _newid()). 'hash' should not be sync'd because # this DB has no object rows. fresh_broker_conn.execute(sql, sql_data) fresh_broker_conn.commit() except sqlite3.OperationalError as err: self.logger.error( 'Failed to sync the container_stat table/view with the ' 'fresh database for %s: %s', self.path, err) return False # Rename to the new database fresh_db_filename = make_db_file_path(self._db_file, epoch) renamer(tmp_db_file, fresh_db_filename) self.reload_db_files() return True def set_sharded_state(self): """ Unlink's the broker's retiring DB file. :return: True if the retiring DB was successfully unlinked, False otherwise. """ state = self.get_db_state() if not state == SHARDING: self.logger.warning("Container %r cannot be set to sharded " "state while in %s state", self.path, state) return False self.reload_db_files() if len(self.db_files) < 2: self.logger.warning( 'Refusing to delete db file for %r: no fresher db file found ' 'in %r.', self.path, self.db_files) return False retiring_file = self.db_files[-2] try: os.unlink(retiring_file) self.logger.debug('Unlinked retiring db %r', retiring_file) except OSError as err: if err.errno != errno.ENOENT: self.logger.exception('Failed to unlink %r' % self._db_file) return False self.reload_db_files() if len(self.db_files) >= 2: self.logger.warning( 'Still have multiple db files after unlinking %r: %r', retiring_file, self.db_files) return False return True def get_brokers(self): """ Return a list of brokers for component dbs. The list has two entries while the db state is sharding: the first entry is a broker for the retiring db with ``skip_commits`` set to ``True``; the second entry is a broker for the fresh db with ``skip_commits`` set to ``False``. For any other db state the list has one entry. :return: a list of :class:`~swift.container.backend.ContainerBroker` """ if len(self.db_files) > 2: self.logger.warning('Unexpected db files will be ignored: %s' % self.db_files[:-2]) brokers = [] db_files = self.db_files[-2:] while db_files: db_file = db_files.pop(0) sub_broker = ContainerBroker( db_file, self.timeout, self.logger, self.account, self.container, self.pending_timeout, self.stale_reads_ok, force_db_file=True, skip_commits=bool(db_files)) brokers.append(sub_broker) return brokers def set_sharding_sysmeta(self, key, value): """ Updates the broker's metadata stored under the given key prefixed with a sharding specific namespace. :param key: metadata key in the sharding metadata namespace. :param value: metadata value """ self.update_metadata({'X-Container-Sysmeta-Shard-' + key: (value, Timestamp.now().internal)}) def get_sharding_sysmeta_with_timestamps(self): """ Returns sharding specific info from the broker's metadata with timestamps. :param key: if given the value stored under ``key`` in the sharding info will be returned. :return: a dict of sharding info with their timestamps. """ prefix = 'X-Container-Sysmeta-Shard-' return { k[len(prefix):]: v for k, v in self.metadata.items() if k.startswith(prefix) } def get_sharding_sysmeta(self, key=None): """ Returns sharding specific info from the broker's metadata. :param key: if given the value stored under ``key`` in the sharding info will be returned. :return: either a dict of sharding info or the value stored under ``key`` in that dict. """ info = self.get_sharding_sysmeta_with_timestamps() if key: return info.get(key, (None, None))[0] else: return {k: v[0] for k, v in info.items()} def _get_root_meta(self): """ Get the (unquoted) root path, plus the header the info came from. If no info available, returns ``(None, None)`` """ path = self.get_sharding_sysmeta('Quoted-Root') if path: return 'X-Container-Sysmeta-Shard-Quoted-Root', unquote(path) path = self.get_sharding_sysmeta('Root') if path: return 'X-Container-Sysmeta-Shard-Root', path return None, None def _load_root_info(self): """ Load the root container name and account for the container represented by this broker. The root container path, if set, is stored in sysmeta under the key ``X-Container-Sysmeta-Shard-Root``. If this sysmeta is not set then the container is considered to be a root container and ``_root_account`` and ``_root_container`` are set equal to the broker ``account`` and ``container`` attributes respectively. """ hdr, path = self._get_root_meta() if not path: # Ensure account/container get populated self._populate_instance_cache() self._root_account = self.account self._root_container = self.container return try: self._root_account, self._root_container = split_path( '/' + path, 2, 2) except ValueError: raise ValueError("Expected %s to be of the form " "'account/container', got %r" % (hdr, path)) @property def root_account(self): if not self._root_account: self._load_root_info() return self._root_account @property def root_container(self): if not self._root_container: self._load_root_info() return self._root_container @property def root_path(self): return '%s/%s' % (self.root_account, self.root_container) def is_root_container(self): """ Returns True if this container is a root container, False otherwise. A root container is a container that is not a shard of another container. """ _, path = self._get_root_meta() if path is not None: # We have metadata telling us where the root is; it's # authoritative; shards should always have this metadata even when # deleted return self.path == path # Else, we're either a root or a legacy deleted shard whose sharding # sysmeta was deleted own_shard_range = self.get_own_shard_range(no_default=True) if not own_shard_range: return True # Never been sharded if own_shard_range.deleted: # When shard ranges shrink, they get marked deleted return False else: # But even when a root collapses, empties, and gets deleted, its # own_shard_range is left alive return True def _get_next_shard_range_upper(self, shard_size, last_upper=None): """ Returns the name of the object that is ``shard_size`` rows beyond ``last_upper`` in the object table ordered by name. If ``last_upper`` is not given then it defaults to the start of object table ordered by name. :param last_upper: the upper bound of the last found shard range. :return: an object name, or None if the number of rows beyond ``last_upper`` is less than ``shard_size``. """ self._commit_puts_stale_ok() with self.get() as connection: sql = ('SELECT name FROM object WHERE %s=0 ' % self._get_deleted_key(connection)) args = [] if last_upper: sql += "AND name > ? " args.append(str(last_upper)) sql += "ORDER BY name LIMIT 1 OFFSET %d" % (shard_size - 1) row = connection.execute(sql, args).fetchone() return row['name'] if row else None def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None, minimum_shard_size=1): """ Scans the container db for shard ranges. Scanning will start at the upper bound of the any ``existing_ranges`` that are given, otherwise at ``ShardRange.MIN``. Scanning will stop when ``limit`` shard ranges have been found or when no more shard ranges can be found. In the latter case, the upper bound of the final shard range will be equal to the upper bound of the container namespace. This method does not modify the state of the db; callers are responsible for persisting any shard range data in the db. :param shard_size: the size of each shard range :param limit: the maximum number of shard points to be found; a negative value (default) implies no limit. :param existing_ranges: an optional list of existing ShardRanges; if given, this list should be sorted in order of upper bounds; the scan for new shard ranges will start at the upper bound of the last existing ShardRange. :param minimum_shard_size: Minimum size of the final shard range. If this is greater than one then the final shard range may be extended to more than shard_size in order to avoid a further shard range with less minimum_shard_size rows. :return: a tuple; the first value in the tuple is a list of dicts each having keys {'index', 'lower', 'upper', 'object_count'} in order of ascending 'upper'; the second value in the tuple is a boolean which is True if the last shard range has been found, False otherwise. """ existing_ranges = existing_ranges or [] minimum_shard_size = max(minimum_shard_size, 1) object_count = self.get_info().get('object_count', 0) if shard_size + minimum_shard_size > object_count: # container not big enough to shard return [], False own_shard_range = self.get_own_shard_range() progress = 0 progress_reliable = True # update initial state to account for any existing shard ranges if existing_ranges: if all([sr.state == ShardRange.FOUND for sr in existing_ranges]): progress = sum([sr.object_count for sr in existing_ranges]) else: # else: object count in existing shard ranges may have changed # since they were found so progress cannot be reliably # calculated; use default progress of zero - that's ok, # progress is used for optimisation not correctness progress_reliable = False last_shard_upper = existing_ranges[-1].upper if last_shard_upper >= own_shard_range.upper: # == implies all ranges were previously found # > implies an acceptor range has been set into which this # shard should cleave itself return [], True else: last_shard_upper = own_shard_range.lower found_ranges = [] sub_broker = self.get_brokers()[0] index = len(existing_ranges) while limit is None or limit < 0 or len(found_ranges) < limit: if progress + shard_size + minimum_shard_size > object_count: # next shard point is within minimum_size rows of the final # object name, or beyond it, so don't bother with db query. # This shard will have <= shard_size + (minimum_size - 1) rows. next_shard_upper = None else: try: next_shard_upper = sub_broker._get_next_shard_range_upper( shard_size, last_shard_upper) except (sqlite3.OperationalError, LockTimeout): self.logger.exception( "Problem finding shard upper in %r: " % self.db_file) break if (next_shard_upper is None or next_shard_upper > own_shard_range.upper): # We reached the end of the container namespace, or possibly # beyond if the container has misplaced objects. In either case # limit the final shard range to own_shard_range.upper. next_shard_upper = own_shard_range.upper if progress_reliable: # object count may include misplaced objects so the final # shard size may not be accurate until cleaved, but at # least the sum of shard sizes will equal the unsharded # object_count shard_size = object_count - progress # NB shard ranges are created with a non-zero object count for a # few reasons: # 1. so that the apparent container object count remains # consistent; # 2. the container is non-deletable while shards have been found # but not yet cleaved; and # 3. So we have a rough idea of size of the shards should be # while cleaving. found_ranges.append( {'index': index, 'lower': str(last_shard_upper), 'upper': str(next_shard_upper), 'object_count': shard_size}) if next_shard_upper == own_shard_range.upper: return found_ranges, True progress += shard_size last_shard_upper = next_shard_upper index += 1 return found_ranges, False