summaryrefslogtreecommitdiff
path: root/swift/container
diff options
context:
space:
mode:
Diffstat (limited to 'swift/container')
-rw-r--r--swift/container/backend.py70
-rw-r--r--swift/container/server.py44
-rw-r--r--swift/container/sharder.py17
3 files changed, 91 insertions, 40 deletions
diff --git a/swift/container/backend.py b/swift/container/backend.py
index 0a18fe48f..bdf34f7d8 100644
--- a/swift/container/backend.py
+++ b/swift/container/backend.py
@@ -34,9 +34,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \
get_db_files, parse_db_filename, make_db_file_path, split_path, \
RESERVED_BYTE
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
- zero_like, DatabaseAlreadyExists
-
-SQLITE_ARG_LIMIT = 999
+ zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
DATADIR = 'containers'
@@ -62,7 +60,7 @@ SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
# tuples and vice-versa
SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count',
'bytes_used', 'meta_timestamp', 'deleted', 'state',
- 'state_timestamp', 'epoch')
+ 'state_timestamp', 'epoch', 'reported')
POLICY_STAT_TABLE_CREATE = '''
CREATE TABLE policy_stat (
@@ -269,6 +267,7 @@ def merge_shards(shard_data, existing):
if existing['timestamp'] < shard_data['timestamp']:
# note that currently we do not roll forward any meta or state from
# an item that was created at older time, newer created time trumps
+ shard_data['reported'] = 0 # reset the latch
return True
elif existing['timestamp'] > shard_data['timestamp']:
return False
@@ -285,6 +284,18 @@ def merge_shards(shard_data, existing):
else:
new_content = True
+ # We can latch the reported flag
+ if existing['reported'] and \
+ existing['object_count'] == shard_data['object_count'] and \
+ existing['bytes_used'] == shard_data['bytes_used'] and \
+ existing['state'] == shard_data['state'] and \
+ existing['epoch'] == shard_data['epoch']:
+ shard_data['reported'] = 1
+ else:
+ shard_data.setdefault('reported', 0)
+ if shard_data['reported'] and not existing['reported']:
+ new_content = True
+
if (existing['state_timestamp'] == shard_data['state_timestamp']
and shard_data['state'] > existing['state']):
new_content = True
@@ -597,7 +608,8 @@ class ContainerBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0,
state INTEGER,
state_timestamp TEXT,
- epoch TEXT
+ epoch TEXT,
+ reported INTEGER DEFAULT 0
);
""" % SHARD_RANGE_TABLE)
@@ -1430,10 +1442,13 @@ class ContainerBroker(DatabaseBroker):
# sqlite3.OperationalError: cannot start a transaction
# within a transaction
conn.rollback()
- if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
- raise
- self.create_shard_range_table(conn)
- return _really_merge_items(conn)
+ if 'no such column: reported' in str(err):
+ self._migrate_add_shard_range_reported(conn)
+ return _really_merge_items(conn)
+ if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
+ self.create_shard_range_table(conn)
+ return _really_merge_items(conn)
+ raise
def get_reconciler_sync(self):
with self.get() as conn:
@@ -1581,9 +1596,20 @@ class ContainerBroker(DatabaseBroker):
CONTAINER_STAT_VIEW_SCRIPT +
'COMMIT;')
- def _reclaim(self, conn, age_timestamp, sync_timestamp):
- super(ContainerBroker, self)._reclaim(conn, age_timestamp,
- sync_timestamp)
+ def _migrate_add_shard_range_reported(self, conn):
+ """
+ Add the reported column to the 'shard_range' table.
+ """
+ conn.executescript('''
+ BEGIN;
+ ALTER TABLE %s
+ ADD COLUMN reported INTEGER DEFAULT 0;
+ COMMIT;
+ ''' % SHARD_RANGE_TABLE)
+
+ def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
+ super(ContainerBroker, self)._reclaim_other_stuff(
+ conn, age_timestamp, sync_timestamp)
# populate instance cache, but use existing conn to avoid deadlock
# when it has a pending update
self._populate_instance_cache(conn=conn)
@@ -1630,7 +1656,7 @@ class ContainerBroker(DatabaseBroker):
elif states is not None:
included_states.add(states)
- def do_query(conn):
+ def do_query(conn, use_reported_column=True):
condition = ''
conditions = []
params = []
@@ -1648,21 +1674,27 @@ class ContainerBroker(DatabaseBroker):
params.append(self.path)
if conditions:
condition = ' WHERE ' + ' AND '.join(conditions)
+ if use_reported_column:
+ columns = SHARD_RANGE_KEYS
+ else:
+ columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', )
sql = '''
SELECT %s
FROM %s%s;
- ''' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, condition)
+ ''' % (', '.join(columns), SHARD_RANGE_TABLE, condition)
data = conn.execute(sql, params)
data.row_factory = None
return [row for row in data]
- try:
- with self.maybe_get(connection) as conn:
+ with self.maybe_get(connection) as conn:
+ try:
return do_query(conn)
- except sqlite3.OperationalError as err:
- if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
+ except sqlite3.OperationalError as err:
+ if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
+ return []
+ if 'no such column: reported' in str(err):
+ return do_query(conn, use_reported_column=False)
raise
- return []
@classmethod
def resolve_shard_range_states(cls, states):
diff --git a/swift/container/server.py b/swift/container/server.py
index c8d7647aa..db9ac0291 100644
--- a/swift/container/server.py
+++ b/swift/container/server.py
@@ -155,6 +155,8 @@ class ContainerController(BaseStorageServer):
conf['auto_create_account_prefix']
else:
self.auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX
+ self.shards_account_prefix = (
+ self.auto_create_account_prefix + 'shards_')
if config_true_value(conf.get('allow_versions', 'f')):
self.save_headers.append('x-versions-location')
if 'allow_versions' in conf:
@@ -375,14 +377,12 @@ class ContainerController(BaseStorageServer):
# auto create accounts)
obj_policy_index = self.get_and_validate_policy_index(req) or 0
broker = self._get_container_broker(drive, part, account, container)
- if account.startswith(self.auto_create_account_prefix) and obj and \
- not os.path.exists(broker.db_file):
- try:
- broker.initialize(req_timestamp.internal, obj_policy_index)
- except DatabaseAlreadyExists:
- pass
- if not os.path.exists(broker.db_file):
+ if obj:
+ self._maybe_autocreate(broker, req_timestamp, account,
+ obj_policy_index, req)
+ elif not os.path.exists(broker.db_file):
return HTTPNotFound()
+
if obj: # delete object
# redirect if a shard range exists for the object name
redirect = self._redirect_to_shard(req, broker, obj)
@@ -449,11 +449,25 @@ class ContainerController(BaseStorageServer):
broker.update_status_changed_at(timestamp)
return recreated
+ def _should_autocreate(self, account, req):
+ auto_create_header = req.headers.get('X-Backend-Auto-Create')
+ if auto_create_header:
+ # If the caller included an explicit X-Backend-Auto-Create header,
+ # assume they know the behavior they want
+ return config_true_value(auto_create_header)
+ if account.startswith(self.shards_account_prefix):
+ # we have to specical case this subset of the
+ # auto_create_account_prefix because we don't want the updater
+ # accidently auto-creating shards; only the sharder creates
+ # shards and it will explicitly tell the server to do so
+ return False
+ return account.startswith(self.auto_create_account_prefix)
+
def _maybe_autocreate(self, broker, req_timestamp, account,
- policy_index):
+ policy_index, req):
created = False
- if account.startswith(self.auto_create_account_prefix) and \
- not os.path.exists(broker.db_file):
+ should_autocreate = self._should_autocreate(account, req)
+ if should_autocreate and not os.path.exists(broker.db_file):
if policy_index is None:
raise HTTPBadRequest(
'X-Backend-Storage-Policy-Index header is required')
@@ -506,8 +520,8 @@ class ContainerController(BaseStorageServer):
# obj put expects the policy_index header, default is for
# legacy support during upgrade.
obj_policy_index = requested_policy_index or 0
- self._maybe_autocreate(broker, req_timestamp, account,
- obj_policy_index)
+ self._maybe_autocreate(
+ broker, req_timestamp, account, obj_policy_index, req)
# redirect if a shard exists for this object name
response = self._redirect_to_shard(req, broker, obj)
if response:
@@ -531,8 +545,8 @@ class ContainerController(BaseStorageServer):
for sr in json.loads(req.body)]
except (ValueError, KeyError, TypeError) as err:
return HTTPBadRequest('Invalid body: %r' % err)
- created = self._maybe_autocreate(broker, req_timestamp, account,
- requested_policy_index)
+ created = self._maybe_autocreate(
+ broker, req_timestamp, account, requested_policy_index, req)
self._update_metadata(req, broker, req_timestamp, 'PUT')
if shard_ranges:
# TODO: consider writing the shard ranges into the pending
@@ -805,7 +819,7 @@ class ContainerController(BaseStorageServer):
requested_policy_index = self.get_and_validate_policy_index(req)
broker = self._get_container_broker(drive, part, account, container)
self._maybe_autocreate(broker, req_timestamp, account,
- requested_policy_index)
+ requested_policy_index, req)
try:
objs = json.load(req.environ['wsgi.input'])
except ValueError as err:
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index d9aa7c66d..dd33043ae 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -618,7 +618,8 @@ class ContainerSharder(ContainerReplicator):
def _send_shard_ranges(self, account, container, shard_ranges,
headers=None):
- body = json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
+ body = json.dumps([dict(sr, reported=0)
+ for sr in shard_ranges]).encode('ascii')
part, nodes = self.ring.get_nodes(account, container)
headers = headers or {}
headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD,
@@ -1148,7 +1149,8 @@ class ContainerSharder(ContainerReplicator):
'X-Backend-Storage-Policy-Index': broker.storage_policy_index,
'X-Container-Sysmeta-Shard-Quoted-Root': quote(
broker.root_path),
- 'X-Container-Sysmeta-Sharding': True}
+ 'X-Container-Sysmeta-Sharding': 'True',
+ 'X-Backend-Auto-Create': 'True'}
# NB: we *used* to send along
# 'X-Container-Sysmeta-Shard-Root': broker.root_path
# but that isn't safe for container names with nulls or newlines
@@ -1468,7 +1470,7 @@ class ContainerSharder(ContainerReplicator):
def _update_root_container(self, broker):
own_shard_range = broker.get_own_shard_range(no_default=True)
- if not own_shard_range:
+ if not own_shard_range or own_shard_range.reported:
return
# persist the reported shard metadata
@@ -1478,9 +1480,12 @@ class ContainerSharder(ContainerReplicator):
include_own=True,
include_deleted=True)
# send everything
- self._send_shard_ranges(
- broker.root_account, broker.root_container,
- shard_ranges)
+ if self._send_shard_ranges(
+ broker.root_account, broker.root_container, shard_ranges):
+ # on success, mark ourselves as reported so we don't keep
+ # hammering the root
+ own_shard_range.reported = True
+ broker.merge_shard_ranges(own_shard_range)
def _process_broker(self, broker, node, part):
broker.get_info() # make sure account/container are populated