summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-09-25 05:12:33 +0000
committerGerrit Code Review <review@openstack.org>2014-09-25 05:12:33 +0000
commit813cc0fe0d05c018bb356c7b00b191949c604347 (patch)
tree8a963be86f800c7c2761e1df5da095ff9df09eae
parent0ac869efa49b86a3ec45cebc6a160a252af32779 (diff)
parent15fbf9fe7cf33ed4b56569078400a2ba070b6bce (diff)
downloadswift-813cc0fe0d05c018bb356c7b00b191949c604347.tar.gz
Merge "Add container_count to policy_stat table"
-rw-r--r--swift/account/auditor.py35
-rw-r--r--swift/account/backend.py115
-rw-r--r--swift/common/exceptions.py4
-rw-r--r--test/unit/account/test_auditor.py135
-rw-r--r--test/unit/account/test_backend.py364
-rw-r--r--test/unit/account/test_server.py6
-rw-r--r--test/unit/account/test_utils.py61
7 files changed, 691 insertions, 29 deletions
diff --git a/swift/account/auditor.py b/swift/account/auditor.py
index af38ed3bd..261acf7e9 100644
--- a/swift/account/auditor.py
+++ b/swift/account/auditor.py
@@ -20,6 +20,7 @@ from random import random
import swift.common.db
from swift.account.backend import AccountBroker, DATADIR
+from swift.common.exceptions import InvalidAccountInfo
from swift.common.utils import get_logger, audit_location_generator, \
config_true_value, dump_recon_cache, ratelimit_sleep
from swift.common.daemon import Daemon
@@ -30,9 +31,9 @@ from eventlet import Timeout
class AccountAuditor(Daemon):
"""Audit accounts."""
- def __init__(self, conf):
+ def __init__(self, conf, logger=None):
self.conf = conf
- self.logger = get_logger(conf, log_route='account-auditor')
+ self.logger = logger or get_logger(conf, log_route='account-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.interval = int(conf.get('interval', 1800))
@@ -104,6 +105,29 @@ class AccountAuditor(Daemon):
dump_recon_cache({'account_auditor_pass_completed': elapsed},
self.rcache, self.logger)
+ def validate_per_policy_counts(self, broker):
+ info = broker.get_info()
+ policy_stats = broker.get_policy_stats(do_migrations=True)
+ policy_totals = {
+ 'container_count': 0,
+ 'object_count': 0,
+ 'bytes_used': 0,
+ }
+ for policy_stat in policy_stats.values():
+ for key in policy_totals:
+ policy_totals[key] += policy_stat[key]
+
+ for key in policy_totals:
+ if policy_totals[key] == info[key]:
+ continue
+ raise InvalidAccountInfo(_(
+ 'The total %(key)s for the container (%(total)s) does not '
+ 'match the sum of %(key)s across policies (%(sum)s)') % {
+ 'key': key,
+ 'total': info[key],
+ 'sum': policy_totals[key],
+ })
+
def account_audit(self, path):
"""
Audits the given account path
@@ -114,10 +138,15 @@ class AccountAuditor(Daemon):
try:
broker = AccountBroker(path)
if not broker.is_deleted():
- broker.get_info()
+ self.validate_per_policy_counts(broker)
self.logger.increment('passes')
self.account_passes += 1
self.logger.debug('Audit passed for %s' % broker)
+ except InvalidAccountInfo as e:
+ self.logger.increment('failures')
+ self.account_failures += 1
+ self.logger.error(
+ _('Audit Failed for %s: %s'), path, str(e))
except (Exception, Timeout):
self.logger.increment('failures')
self.account_failures += 1
diff --git a/swift/account/backend.py b/swift/account/backend.py
index 1ad37c22c..89c6cfb65 100644
--- a/swift/account/backend.py
+++ b/swift/account/backend.py
@@ -32,17 +32,19 @@ POLICY_STAT_TRIGGER_SCRIPT = """
CREATE TRIGGER container_insert_ps AFTER INSERT ON container
BEGIN
INSERT OR IGNORE INTO policy_stat
- (storage_policy_index, object_count, bytes_used)
- VALUES (new.storage_policy_index, 0, 0);
+ (storage_policy_index, container_count, object_count, bytes_used)
+ VALUES (new.storage_policy_index, 0, 0, 0);
UPDATE policy_stat
- SET object_count = object_count + new.object_count,
+ SET container_count = container_count + (1 - new.deleted),
+ object_count = object_count + new.object_count,
bytes_used = bytes_used + new.bytes_used
WHERE storage_policy_index = new.storage_policy_index;
END;
CREATE TRIGGER container_delete_ps AFTER DELETE ON container
BEGIN
UPDATE policy_stat
- SET object_count = object_count - old.object_count,
+ SET container_count = container_count - (1 - old.deleted),
+ object_count = object_count - old.object_count,
bytes_used = bytes_used - old.bytes_used
WHERE storage_policy_index = old.storage_policy_index;
END;
@@ -165,13 +167,15 @@ class AccountBroker(DatabaseBroker):
conn.executescript("""
CREATE TABLE policy_stat (
storage_policy_index INTEGER PRIMARY KEY,
+ container_count INTEGER DEFAULT 0,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0
);
INSERT OR IGNORE INTO policy_stat (
- storage_policy_index, object_count, bytes_used
+ storage_policy_index, container_count, object_count,
+ bytes_used
)
- SELECT 0, object_count, bytes_used
+ SELECT 0, container_count, object_count, bytes_used
FROM account_stat
WHERE container_count > 0;
""")
@@ -296,24 +300,45 @@ class AccountBroker(DatabaseBroker):
return row['status'] == "DELETED" or (
row['delete_timestamp'] > row['put_timestamp'])
- def get_policy_stats(self):
+ def get_policy_stats(self, do_migrations=False):
"""
Get global policy stats for the account.
+ :param do_migrations: boolean, if True the policy stat dicts will
+ always include the 'container_count' key;
+ otherwise it may be ommited on legacy databases
+ until they are migrated.
+
:returns: dict of policy stats where the key is the policy index and
the value is a dictionary like {'object_count': M,
- 'bytes_used': N}
+ 'bytes_used': N, 'container_count': L}
"""
- info = []
+ columns = [
+ 'storage_policy_index',
+ 'container_count',
+ 'object_count',
+ 'bytes_used',
+ ]
+
+ def run_query():
+ return (conn.execute('''
+ SELECT %s
+ FROM policy_stat
+ ''' % ', '.join(columns)).fetchall())
+
self._commit_puts_stale_ok()
+ info = []
with self.get() as conn:
try:
- info = (conn.execute('''
- SELECT storage_policy_index, object_count, bytes_used
- FROM policy_stat
- ''').fetchall())
+ info = run_query()
except sqlite3.OperationalError as err:
- if "no such table: policy_stat" not in str(err):
+ if "no such column: container_count" in str(err):
+ if do_migrations:
+ self._migrate_add_container_count(conn)
+ else:
+ columns.remove('container_count')
+ info = run_query()
+ elif "no such table: policy_stat" not in str(err):
raise
policy_stats = {}
@@ -501,10 +526,72 @@ class AccountBroker(DatabaseBroker):
self._migrate_add_storage_policy_index(conn)
_really_merge_items(conn)
+ def _migrate_add_container_count(self, conn):
+ """
+ Add the container_count column to the 'policy_stat' table and
+ update it
+
+ :param conn: DB connection object
+ """
+ # add the container_count column
+ curs = conn.cursor()
+ curs.executescript('''
+ DROP TRIGGER container_delete_ps;
+ DROP TRIGGER container_insert_ps;
+ ALTER TABLE policy_stat
+ ADD COLUMN container_count INTEGER DEFAULT 0;
+ ''' + POLICY_STAT_TRIGGER_SCRIPT)
+
+ # keep the simple case simple, if there's only one entry in the
+ # policy_stat table we just copy the total container count from the
+ # account_stat table
+
+ # if that triggers an update then the where changes <> 0 *would* exist
+ # and the insert or replace from the count subqueries won't execute
+
+ curs.executescript("""
+ UPDATE policy_stat
+ SET container_count = (
+ SELECT container_count
+ FROM account_stat)
+ WHERE (
+ SELECT COUNT(storage_policy_index)
+ FROM policy_stat
+ ) <= 1;
+
+ INSERT OR REPLACE INTO policy_stat (
+ storage_policy_index,
+ container_count,
+ object_count,
+ bytes_used
+ )
+ SELECT p.storage_policy_index,
+ c.count,
+ p.object_count,
+ p.bytes_used
+ FROM (
+ SELECT storage_policy_index,
+ COUNT(*) as count
+ FROM container
+ WHERE deleted = 0
+ GROUP BY storage_policy_index
+ ) c
+ JOIN policy_stat p
+ ON p.storage_policy_index = c.storage_policy_index
+ WHERE NOT EXISTS(
+ SELECT changes() as change
+ FROM policy_stat
+ WHERE change <> 0
+ );
+ """)
+ conn.commit()
+
def _migrate_add_storage_policy_index(self, conn):
"""
Add the storage_policy_index column to the 'container' table and
set up triggers, creating the policy_stat table if needed.
+
+ :param conn: DB connection object
"""
try:
self.create_policy_stat_table(conn)
diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py
index e46df933b..856c48975 100644
--- a/swift/common/exceptions.py
+++ b/swift/common/exceptions.py
@@ -79,6 +79,10 @@ class DeviceUnavailable(SwiftException):
pass
+class InvalidAccountInfo(SwiftException):
+ pass
+
+
class PathNotDir(OSError):
pass
diff --git a/test/unit/account/test_auditor.py b/test/unit/account/test_auditor.py
index 499b44155..c79209bc0 100644
--- a/test/unit/account/test_auditor.py
+++ b/test/unit/account/test_auditor.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from collections import defaultdict
+import itertools
import unittest
import mock
import time
@@ -23,7 +25,11 @@ from shutil import rmtree
from eventlet import Timeout
from swift.account import auditor
-from test.unit import FakeLogger
+from swift.common.storage_policy import POLICIES
+from swift.common.utils import Timestamp
+from test.unit import debug_logger, patch_policies, with_tempdir
+from test.unit.account.test_backend import (
+ AccountBrokerPreTrackContainerCountSetup)
class FakeAccountBroker(object):
@@ -37,16 +43,22 @@ class FakeAccountBroker(object):
def get_info(self):
if self.file.startswith('fail'):
- raise ValueError
+ raise ValueError()
if self.file.startswith('true'):
- return 'ok'
+ return defaultdict(int)
+
+ def get_policy_stats(self, **kwargs):
+ if self.file.startswith('fail'):
+ raise ValueError()
+ if self.file.startswith('true'):
+ return defaultdict(int)
class TestAuditor(unittest.TestCase):
def setUp(self):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_account_auditor')
- self.logger = FakeLogger()
+ self.logger = debug_logger()
rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
fnames = ['true1.db', 'true2.db', 'true3.db',
@@ -69,9 +81,7 @@ class TestAuditor(unittest.TestCase):
def sleep(self, sec):
self.times += 1
- if self.times < sleep_times:
- time.sleep(0.1)
- else:
+ if self.times >= sleep_times:
# stop forever by an error
raise ValueError()
@@ -79,7 +89,7 @@ class TestAuditor(unittest.TestCase):
return time.time()
conf = {}
- test_auditor = auditor.AccountAuditor(conf)
+ test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
with mock.patch('swift.account.auditor.time', FakeTime()):
def fake_audit_location_generator(*args, **kwargs):
@@ -106,7 +116,7 @@ class TestAuditor(unittest.TestCase):
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
def test_run_once(self):
conf = {}
- test_auditor = auditor.AccountAuditor(conf)
+ test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
def fake_audit_location_generator(*args, **kwargs):
files = os.listdir(self.testdir)
@@ -121,7 +131,7 @@ class TestAuditor(unittest.TestCase):
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
def test_one_audit_pass(self):
conf = {}
- test_auditor = auditor.AccountAuditor(conf)
+ test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
def fake_audit_location_generator(*args, **kwargs):
files = os.listdir(self.testdir)
@@ -138,7 +148,7 @@ class TestAuditor(unittest.TestCase):
@mock.patch('swift.account.auditor.AccountBroker', FakeAccountBroker)
def test_account_auditor(self):
conf = {}
- test_auditor = auditor.AccountAuditor(conf)
+ test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
files = os.listdir(self.testdir)
for f in files:
path = os.path.join(self.testdir, f)
@@ -146,5 +156,108 @@ class TestAuditor(unittest.TestCase):
self.assertEqual(test_auditor.account_failures, 2)
self.assertEqual(test_auditor.account_passes, 3)
+
+@patch_policies
+class TestAuditorRealBrokerMigration(
+ AccountBrokerPreTrackContainerCountSetup, unittest.TestCase):
+
+ def test_db_migration(self):
+ # add a few containers
+ policies = itertools.cycle(POLICIES)
+ num_containers = len(POLICIES) * 3
+ per_policy_container_counts = defaultdict(int)
+ for i in range(num_containers):
+ name = 'test-container-%02d' % i
+ policy = next(policies)
+ self.broker.put_container(name, next(self.ts),
+ 0, 0, 0, int(policy))
+ per_policy_container_counts[int(policy)] += 1
+
+ self.broker._commit_puts()
+ self.assertEqual(num_containers,
+ self.broker.get_info()['container_count'])
+
+ # still un-migrated
+ self.assertUnmigrated(self.broker)
+
+ # run auditor, and validate migration
+ conf = {'devices': self.tempdir, 'mount_check': False,
+ 'recon_cache_path': self.tempdir}
+ test_auditor = auditor.AccountAuditor(conf, logger=debug_logger())
+ test_auditor.run_once()
+
+ self.restore_account_broker()
+
+ broker = auditor.AccountBroker(self.db_path)
+ # go after rows directly to avoid unintentional migration
+ with broker.get() as conn:
+ rows = conn.execute('''
+ SELECT storage_policy_index, container_count
+ FROM policy_stat
+ ''').fetchall()
+ for policy_index, container_count in rows:
+ self.assertEqual(container_count,
+ per_policy_container_counts[policy_index])
+
+
+class TestAuditorRealBroker(unittest.TestCase):
+
+ def setUp(self):
+ self.logger = debug_logger()
+
+ @with_tempdir
+ def test_db_validate_fails(self, tempdir):
+ ts = (Timestamp(t).internal for t in itertools.count(int(time.time())))
+ db_path = os.path.join(tempdir, 'sda', 'accounts',
+ '0', '0', '0', 'test.db')
+ broker = auditor.AccountBroker(db_path, account='a')
+ broker.initialize(next(ts))
+ # add a few containers
+ policies = itertools.cycle(POLICIES)
+ num_containers = len(POLICIES) * 3
+ per_policy_container_counts = defaultdict(int)
+ for i in range(num_containers):
+ name = 'test-container-%02d' % i
+ policy = next(policies)
+ broker.put_container(name, next(ts), 0, 0, 0, int(policy))
+ per_policy_container_counts[int(policy)] += 1
+
+ broker._commit_puts()
+ self.assertEqual(broker.get_info()['container_count'], num_containers)
+
+ messed_up_policy = random.choice(list(POLICIES))
+
+ # now mess up a policy_stats table count
+ with broker.get() as conn:
+ conn.executescript('''
+ UPDATE policy_stat
+ SET container_count = container_count - 1
+ WHERE storage_policy_index = %d;
+ ''' % int(messed_up_policy))
+
+ # validate it's messed up
+ policy_stats = broker.get_policy_stats()
+ self.assertEqual(
+ policy_stats[int(messed_up_policy)]['container_count'],
+ per_policy_container_counts[int(messed_up_policy)] - 1)
+
+ # do an audit
+ conf = {'devices': tempdir, 'mount_check': False,
+ 'recon_cache_path': tempdir}
+ test_auditor = auditor.AccountAuditor(conf, logger=self.logger)
+ test_auditor.run_once()
+
+ # validate errors
+ self.assertEqual(test_auditor.account_failures, 1)
+ error_lines = test_auditor.logger.get_lines_for_level('error')
+ self.assertEqual(len(error_lines), 1)
+ error_message = error_lines[0]
+ self.assert_(broker.db_file in error_message)
+ self.assert_('container_count' in error_message)
+ self.assert_('does not match' in error_message)
+ self.assertEqual(test_auditor.logger.get_increment_counts(),
+ {'failures': 1})
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/account/test_backend.py b/test/unit/account/test_backend.py
index 1fb65260a..40fac7673 100644
--- a/test/unit/account/test_backend.py
+++ b/test/unit/account/test_backend.py
@@ -15,6 +15,7 @@
""" Tests for swift.account.backend """
+from collections import defaultdict
import hashlib
import json
import unittest
@@ -656,9 +657,10 @@ class TestAccountBroker(unittest.TestCase):
put_timestamp, 0,
0, 0,
policy.idx)
-
policy_stats = broker.get_policy_stats()
stats = policy_stats[policy.idx]
+ if 'container_count' in stats:
+ self.assertEqual(stats['container_count'], 1)
self.assertEqual(stats['object_count'], 0)
self.assertEqual(stats['bytes_used'], 0)
@@ -674,6 +676,8 @@ class TestAccountBroker(unittest.TestCase):
policy_stats = broker.get_policy_stats()
stats = policy_stats[policy.idx]
+ if 'container_count' in stats:
+ self.assertEqual(stats['container_count'], 1)
self.assertEqual(stats['object_count'], count)
self.assertEqual(stats['bytes_used'], count)
@@ -681,6 +685,8 @@ class TestAccountBroker(unittest.TestCase):
for policy_index, stats in policy_stats.items():
policy = POLICIES[policy_index]
count = policy.idx * 100 # coupled with policy for test
+ if 'container_count' in stats:
+ self.assertEqual(stats['container_count'], 1)
self.assertEqual(stats['object_count'], count)
self.assertEqual(stats['bytes_used'], count)
@@ -695,6 +701,8 @@ class TestAccountBroker(unittest.TestCase):
policy_stats = broker.get_policy_stats()
stats = policy_stats[policy.idx]
+ if 'container_count' in stats:
+ self.assertEqual(stats['container_count'], 0)
self.assertEqual(stats['object_count'], 0)
self.assertEqual(stats['bytes_used'], 0)
@@ -714,8 +722,12 @@ class TestAccountBroker(unittest.TestCase):
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
+ if 'container_count' in stats[0]:
+ self.assertEqual(stats[0]['container_count'], 1)
self.assertEqual(stats[0]['object_count'], 13)
self.assertEqual(stats[0]['bytes_used'], 8156441)
+ if 'container_count' in stats[1]:
+ self.assertEqual(stats[1]['container_count'], 1)
self.assertEqual(stats[1]['object_count'], 8)
self.assertEqual(stats[1]['bytes_used'], 6085379)
@@ -1019,8 +1031,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
# we should have stats for both containers
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
+ if 'container_count' in stats[0]:
+ self.assertEqual(stats[0]['container_count'], 1)
self.assertEqual(stats[0]['object_count'], 1)
self.assertEqual(stats[0]['bytes_used'], 2)
+ if 'container_count' in stats[1]:
+ self.assertEqual(stats[1]['container_count'], 1)
self.assertEqual(stats[1]['object_count'], 3)
self.assertEqual(stats[1]['bytes_used'], 4)
@@ -1032,8 +1048,12 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
conn.commit()
stats = broker.get_policy_stats()
self.assertEqual(len(stats), 2)
+ if 'container_count' in stats[0]:
+ self.assertEqual(stats[0]['container_count'], 0)
self.assertEqual(stats[0]['object_count'], 0)
self.assertEqual(stats[0]['bytes_used'], 0)
+ if 'container_count' in stats[1]:
+ self.assertEqual(stats[1]['container_count'], 1)
self.assertEqual(stats[1]['object_count'], 3)
self.assertEqual(stats[1]['bytes_used'], 4)
@@ -1099,3 +1119,345 @@ class TestAccountBrokerBeforeSPI(TestAccountBroker):
with broker.get() as conn:
conn.execute('SELECT * FROM policy_stat')
conn.execute('SELECT storage_policy_index FROM container')
+
+
+def pre_track_containers_create_policy_stat(self, conn):
+ """
+ Copied from AccountBroker before the container_count column was
+ added.
+ Create policy_stat table which is specific to the account DB.
+ Not a part of Pluggable Back-ends, internal to the baseline code.
+
+ :param conn: DB connection object
+ """
+ conn.executescript("""
+ CREATE TABLE policy_stat (
+ storage_policy_index INTEGER PRIMARY KEY,
+ object_count INTEGER DEFAULT 0,
+ bytes_used INTEGER DEFAULT 0
+ );
+ INSERT OR IGNORE INTO policy_stat (
+ storage_policy_index, object_count, bytes_used
+ )
+ SELECT 0, object_count, bytes_used
+ FROM account_stat
+ WHERE container_count > 0;
+ """)
+
+
+def pre_track_containers_create_container_table(self, conn):
+ """
+ Copied from AccountBroker before the container_count column was
+ added (using old stat trigger script)
+ Create container table which is specific to the account DB.
+
+ :param conn: DB connection object
+ """
+ # revert to old trigger script to support one of the tests
+ OLD_POLICY_STAT_TRIGGER_SCRIPT = """
+ CREATE TRIGGER container_insert_ps AFTER INSERT ON container
+ BEGIN
+ INSERT OR IGNORE INTO policy_stat
+ (storage_policy_index, object_count, bytes_used)
+ VALUES (new.storage_policy_index, 0, 0);
+ UPDATE policy_stat
+ SET object_count = object_count + new.object_count,
+ bytes_used = bytes_used + new.bytes_used
+ WHERE storage_policy_index = new.storage_policy_index;
+ END;
+ CREATE TRIGGER container_delete_ps AFTER DELETE ON container
+ BEGIN
+ UPDATE policy_stat
+ SET object_count = object_count - old.object_count,
+ bytes_used = bytes_used - old.bytes_used
+ WHERE storage_policy_index = old.storage_policy_index;
+ END;
+
+ """
+ conn.executescript("""
+ CREATE TABLE container (
+ ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT,
+ put_timestamp TEXT,
+ delete_timestamp TEXT,
+ object_count INTEGER,
+ bytes_used INTEGER,
+ deleted INTEGER DEFAULT 0,
+ storage_policy_index INTEGER DEFAULT 0
+ );
+
+ CREATE INDEX ix_container_deleted_name ON
+ container (deleted, name);
+
+ CREATE TRIGGER container_insert AFTER INSERT ON container
+ BEGIN
+ UPDATE account_stat
+ SET container_count = container_count + (1 - new.deleted),
+ object_count = object_count + new.object_count,
+ bytes_used = bytes_used + new.bytes_used,
+ hash = chexor(hash, new.name,
+ new.put_timestamp || '-' ||
+ new.delete_timestamp || '-' ||
+ new.object_count || '-' || new.bytes_used);
+ END;
+
+ CREATE TRIGGER container_update BEFORE UPDATE ON container
+ BEGIN
+ SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
+ END;
+
+
+ CREATE TRIGGER container_delete AFTER DELETE ON container
+ BEGIN
+ UPDATE account_stat
+ SET container_count = container_count - (1 - old.deleted),
+ object_count = object_count - old.object_count,
+ bytes_used = bytes_used - old.bytes_used,
+ hash = chexor(hash, old.name,
+ old.put_timestamp || '-' ||
+ old.delete_timestamp || '-' ||
+ old.object_count || '-' || old.bytes_used);
+ END;
+ """ + OLD_POLICY_STAT_TRIGGER_SCRIPT)
+
+
+class AccountBrokerPreTrackContainerCountSetup(object):
+ def assertUnmigrated(self, broker):
+ with broker.get() as conn:
+ try:
+ conn.execute('''
+ SELECT container_count FROM policy_stat
+ ''').fetchone()[0]
+ except sqlite3.OperationalError as err:
+ # confirm that the column really isn't there
+ self.assert_('no such column: container_count' in str(err))
+ else:
+ self.fail('broker did not raise sqlite3.OperationalError '
+ 'trying to select container_count from policy_stat!')
+
+ def setUp(self):
+ # use old version of policy_stat
+ self._imported_create_policy_stat_table = \
+ AccountBroker.create_policy_stat_table
+ AccountBroker.create_policy_stat_table = \
+ pre_track_containers_create_policy_stat
+ # use old container table so we use old trigger for
+ # updating policy_stat
+ self._imported_create_container_table = \
+ AccountBroker.create_container_table
+ AccountBroker.create_container_table = \
+ pre_track_containers_create_container_table
+
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(Timestamp('1').internal)
+ self.assertUnmigrated(broker)
+
+ self.tempdir = mkdtemp()
+ self.ts = (Timestamp(t).internal for t in itertools.count(int(time())))
+
+ self.db_path = os.path.join(self.tempdir, 'sda', 'accounts',
+ '0', '0', '0', 'test.db')
+ self.broker = AccountBroker(self.db_path, account='a')
+ self.broker.initialize(next(self.ts))
+
+ # Common sanity-check that our starting, pre-migration state correctly
+ # does not have the container_count column.
+ self.assertUnmigrated(self.broker)
+
+ def tearDown(self):
+ rmtree(self.tempdir, ignore_errors=True)
+
+ self.restore_account_broker()
+
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(Timestamp('1').internal)
+ with broker.get() as conn:
+ conn.execute('SELECT container_count FROM policy_stat')
+
+ def restore_account_broker(self):
+ AccountBroker.create_policy_stat_table = \
+ self._imported_create_policy_stat_table
+ AccountBroker.create_container_table = \
+ self._imported_create_container_table
+
+
+@patch_policies([StoragePolicy(0, 'zero', False),
+ StoragePolicy(1, 'one', True),
+ StoragePolicy(2, 'two', False),
+ StoragePolicy(3, 'three', False)])
+class TestAccountBrokerBeforePerPolicyContainerTrack(
+ AccountBrokerPreTrackContainerCountSetup, TestAccountBroker):
+ """
+ Tests for AccountBroker against databases created before
+ the container_count column was added to the policy_stat table.
+ """
+
+ def test_policy_table_cont_count_do_migrations(self):
+ # add a few containers
+ num_containers = 8
+ policies = itertools.cycle(POLICIES)
+ per_policy_container_counts = defaultdict(int)
+
+ # add a few container entries
+ for i in range(num_containers):
+ name = 'test-container-%02d' % i
+ policy = next(policies)
+ self.broker.put_container(name, next(self.ts),
+ 0, 0, 0, int(policy))
+ per_policy_container_counts[int(policy)] += 1
+
+ total_container_count = self.broker.get_info()['container_count']
+ self.assertEqual(total_container_count, num_containers)
+
+ # still un-migrated
+ self.assertUnmigrated(self.broker)
+
+ policy_stats = self.broker.get_policy_stats()
+ self.assertEqual(len(policy_stats), len(per_policy_container_counts))
+ for stats in policy_stats.values():
+ self.assertEqual(stats['object_count'], 0)
+ self.assertEqual(stats['bytes_used'], 0)
+ # un-migrated dbs should not return container_count
+ self.assertFalse('container_count' in stats)
+
+ # now force the migration
+ policy_stats = self.broker.get_policy_stats(do_migrations=True)
+ self.assertEqual(len(policy_stats), len(per_policy_container_counts))
+ for policy_index, stats in policy_stats.items():
+ self.assertEqual(stats['object_count'], 0)
+ self.assertEqual(stats['bytes_used'], 0)
+ self.assertEqual(stats['container_count'],
+ per_policy_container_counts[policy_index])
+
+ def test_policy_table_cont_count_update_get_stats(self):
+ # add a few container entries
+ for policy in POLICIES:
+ for i in range(0, policy.idx + 1):
+ container_name = 'c%s_0' % policy.idx
+ self.broker.put_container('c%s_%s' % (policy.idx, i),
+ 0, 0, 0, 0, policy.idx)
+ # _commit_puts_stale_ok() called by get_policy_stats()
+
+ # calling get_policy_stats() with do_migrations will alter the table
+ # and populate it based on what's in the container table now
+ stats = self.broker.get_policy_stats(do_migrations=True)
+
+ # now confirm that the column was created
+ with self.broker.get() as conn:
+ conn.execute('SELECT container_count FROM policy_stat')
+
+ # confirm stats reporting back correctly
+ self.assertEqual(len(stats), 4)
+ for policy in POLICIES:
+ self.assertEqual(stats[policy.idx]['container_count'],
+ policy.idx + 1)
+
+ # now delete one from each policy and check the stats
+ with self.broker.get() as conn:
+ for policy in POLICIES:
+ container_name = 'c%s_0' % policy.idx
+ conn.execute('''
+ DELETE FROM container
+ WHERE name = ?
+ ''', (container_name,))
+ conn.commit()
+ stats = self.broker.get_policy_stats()
+ self.assertEqual(len(stats), 4)
+ for policy in POLICIES:
+ self.assertEqual(stats[policy.idx]['container_count'],
+ policy.idx)
+
+ # now put them back and make sure things are still cool
+ for policy in POLICIES:
+ container_name = 'c%s_0' % policy.idx
+ self.broker.put_container(container_name, 0, 0, 0, 0, policy.idx)
+ # _commit_puts_stale_ok() called by get_policy_stats()
+
+ # confirm stats reporting back correctly
+ stats = self.broker.get_policy_stats()
+ self.assertEqual(len(stats), 4)
+ for policy in POLICIES:
+ self.assertEqual(stats[policy.idx]['container_count'],
+ policy.idx + 1)
+
+ def test_per_policy_cont_count_migration_with_deleted(self):
+ num_containers = 15
+ policies = itertools.cycle(POLICIES)
+ container_policy_map = {}
+
+ # add a few container entries
+ for i in range(num_containers):
+ name = 'test-container-%02d' % i
+ policy = next(policies)
+ self.broker.put_container(name, next(self.ts),
+ 0, 0, 0, int(policy))
+ # keep track of stub container policies
+ container_policy_map[name] = policy
+
+ # delete about half of the containers
+ for i in range(0, num_containers, 2):
+ name = 'test-container-%02d' % i
+ policy = container_policy_map[name]
+ self.broker.put_container(name, 0, next(self.ts),
+ 0, 0, int(policy))
+
+ total_container_count = self.broker.get_info()['container_count']
+ self.assertEqual(total_container_count, num_containers / 2)
+
+ # trigger migration
+ policy_info = self.broker.get_policy_stats(do_migrations=True)
+ self.assertEqual(len(policy_info), min(num_containers, len(POLICIES)))
+ policy_container_count = sum(p['container_count'] for p in
+ policy_info.values())
+ self.assertEqual(total_container_count, policy_container_count)
+
+ def test_per_policy_cont_count_migration_with_single_policy(self):
+ num_containers = 100
+
+ with patch_policies(legacy_only=True):
+ policy = POLICIES[0]
+ # add a few container entries
+ for i in range(num_containers):
+ name = 'test-container-%02d' % i
+ self.broker.put_container(name, next(self.ts),
+ 0, 0, 0, int(policy))
+ # delete about half of the containers
+ for i in range(0, num_containers, 2):
+ name = 'test-container-%02d' % i
+ self.broker.put_container(name, 0, next(self.ts),
+ 0, 0, int(policy))
+
+ total_container_count = self.broker.get_info()['container_count']
+ # trigger migration
+ policy_info = self.broker.get_policy_stats(do_migrations=True)
+
+ self.assertEqual(total_container_count, num_containers / 2)
+
+ self.assertEqual(len(policy_info), 1)
+ policy_container_count = sum(p['container_count'] for p in
+ policy_info.values())
+ self.assertEqual(total_container_count, policy_container_count)
+
+ def test_per_policy_cont_count_migration_impossible(self):
+ with patch_policies(legacy_only=True):
+ # add a container for the legacy policy
+ policy = POLICIES[0]
+ self.broker.put_container('test-legacy-container', next(self.ts),
+ 0, 0, 0, int(policy))
+
+ # now create an impossible situation by adding a container for a
+ # policy index that doesn't exist
+ non_existant_policy_index = int(policy) + 1
+ self.broker.put_container('test-non-existant-policy',
+ next(self.ts), 0, 0, 0,
+ non_existant_policy_index)
+
+ total_container_count = self.broker.get_info()['container_count']
+
+ # trigger migration
+ policy_info = self.broker.get_policy_stats(do_migrations=True)
+
+ self.assertEqual(total_container_count, 2)
+ self.assertEqual(len(policy_info), 2)
+ for policy_stat in policy_info.values():
+ self.assertEqual(policy_stat['container_count'], 1)
diff --git a/test/unit/account/test_server.py b/test/unit/account/test_server.py
index e515b3d22..c18c57edb 100644
--- a/test/unit/account/test_server.py
+++ b/test/unit/account/test_server.py
@@ -1708,6 +1708,9 @@ class TestAccountController(unittest.TestCase):
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
POLICIES[0].name], '4')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Container-Count' %
+ POLICIES[0].name], '1')
def test_policy_stats_non_default(self):
ts = itertools.count()
@@ -1743,6 +1746,9 @@ class TestAccountController(unittest.TestCase):
self.assertEquals(
resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
policy.name], '4')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Container-Count' %
+ policy.name], '1')
def test_empty_policy_stats(self):
ts = itertools.count()
diff --git a/test/unit/account/test_utils.py b/test/unit/account/test_utils.py
index 46f8835bf..ea90decfc 100644
--- a/test/unit/account/test_utils.py
+++ b/test/unit/account/test_utils.py
@@ -117,9 +117,70 @@ class TestAccountUtils(unittest.TestCase):
})
for policy in POLICIES:
prefix = 'X-Account-Storage-Policy-%s-' % policy.name
+ expected[prefix + 'Container-Count'] = 1
expected[prefix + 'Object-Count'] = int(policy)
expected[prefix + 'Bytes-Used'] = int(policy) * 10
resp_headers = utils.get_response_headers(broker)
+ per_policy_container_headers = [
+ h for h in resp_headers if
+ h.lower().startswith('x-account-storage-policy-') and
+ h.lower().endswith('-container-count')]
+ self.assertTrue(per_policy_container_headers)
+ for key, value in resp_headers.items():
+ expected_value = expected.pop(key)
+ self.assertEqual(expected_value, str(value),
+ 'value for %r was %r not %r' % (
+ key, value, expected_value))
+ self.assertFalse(expected)
+
+ @patch_policies
+ def test_get_response_headers_with_legacy_data(self):
+ broker = backend.AccountBroker(':memory:', account='a')
+ now = time.time()
+ with mock.patch('time.time', new=lambda: now):
+ broker.initialize(Timestamp(now).internal)
+ # add some container data
+ ts = (Timestamp(t).internal for t in itertools.count(int(now)))
+ total_containers = 0
+ total_objects = 0
+ total_bytes = 0
+ for policy in POLICIES:
+ delete_timestamp = ts.next()
+ put_timestamp = ts.next()
+ object_count = int(policy)
+ bytes_used = int(policy) * 10
+ broker.put_container('c-%s' % policy.name, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ int(policy))
+ total_containers += 1
+ total_objects += object_count
+ total_bytes += bytes_used
+ expected = HeaderKeyDict({
+ 'X-Account-Container-Count': total_containers,
+ 'X-Account-Object-Count': total_objects,
+ 'X-Account-Bytes-Used': total_bytes,
+ 'X-Timestamp': Timestamp(now).normal,
+ 'X-PUT-Timestamp': Timestamp(now).normal,
+ })
+ for policy in POLICIES:
+ prefix = 'X-Account-Storage-Policy-%s-' % policy.name
+ expected[prefix + 'Object-Count'] = int(policy)
+ expected[prefix + 'Bytes-Used'] = int(policy) * 10
+ orig_policy_stats = broker.get_policy_stats
+
+ def stub_policy_stats(*args, **kwargs):
+ policy_stats = orig_policy_stats(*args, **kwargs)
+ for stats in policy_stats.values():
+ # legacy db's won't return container_count
+ del stats['container_count']
+ return policy_stats
+ broker.get_policy_stats = stub_policy_stats
+ resp_headers = utils.get_response_headers(broker)
+ per_policy_container_headers = [
+ h for h in resp_headers if
+ h.lower().startswith('x-account-storage-policy-') and
+ h.lower().endswith('-container-count')]
+ self.assertFalse(per_policy_container_headers)
for key, value in resp_headers.items():
expected_value = expected.pop(key)
self.assertEqual(expected_value, str(value),