summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Luse <paul.e.luse@intel.com>2014-05-27 19:02:52 -0700
committerClay Gerrard <clay.gerrard@gmail.com>2014-06-18 20:57:09 -0700
commit00a162c4d42a0f7813058d2ae5c3208b4f7a3157 (patch)
treebacb30f96563b7e2f3a34626e13ed69e725c28fa
parent81bc31e6ecebb63deb33406b342aebc9e44d3c51 (diff)
downloadswift-00a162c4d42a0f7813058d2ae5c3208b4f7a3157.tar.gz
Add Storage Policy Support to Accounts
This change updates the account HEAD handler to report out per policy object and byte usage for the account. Cumulative values are still reported and policy names are used in the report out (unless request is sent to an account server directly in which case policy indexes are used for easier accounting). Below is an example of the relevant HEAD response for a cluster with 3 policies and just a few small objects: X-Account-Container-Count: 3 X-Account-Object-Count: 3 X-Account-Bytes-Used: 21 X-Storage-Policy-Bronze-Object-Count: 1 X-Storage-Policy-Bronze-Bytes-Used: 7 X-Storage-Policy-Silver-Object-Count: 1 X-Storage-Policy-Silver-Bytes-Used: 7 X-Storage-Policy-Gold-Object-Count: 1 X-Storage-Policy-Gold-Bytes-Used: 7 Set a DEFAULT storage_policy_index for existing container rows during migration. Copy existing object_count and bytes_used in policy_stat table during migration. DocImpact Implements: blueprint storage-policies Change-Id: I5ec251f9a8014dd89764340de927d09466c72221
-rw-r--r--swift/account/backend.py149
-rw-r--r--swift/account/server.py18
-rw-r--r--swift/account/utils.py28
-rw-r--r--swift/container/server.py1
-rw-r--r--swift/container/updater.py9
-rw-r--r--test/unit/account/test_backend.py563
-rw-r--r--test/unit/account/test_replicator.py130
-rw-r--r--test/unit/account/test_server.py181
-rw-r--r--test/unit/container/test_server.py2
9 files changed, 994 insertions, 87 deletions
diff --git a/swift/account/backend.py b/swift/account/backend.py
index 1a940c49e..815ccd5ef 100644
--- a/swift/account/backend.py
+++ b/swift/account/backend.py
@@ -31,6 +31,28 @@ from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
DATADIR = 'accounts'
+POLICY_STAT_TRIGGER_SCRIPT = """
+ CREATE TRIGGER container_insert_ps AFTER INSERT ON container
+ BEGIN
+ INSERT OR IGNORE INTO policy_stat
+ (storage_policy_index, object_count, bytes_used)
+ VALUES (new.storage_policy_index, 0, 0);
+ UPDATE policy_stat
+ SET object_count = object_count + new.object_count,
+ bytes_used = bytes_used + new.bytes_used
+ WHERE storage_policy_index = new.storage_policy_index;
+ END;
+ CREATE TRIGGER container_delete_ps AFTER DELETE ON container
+ BEGIN
+ UPDATE policy_stat
+ SET object_count = object_count - old.object_count,
+ bytes_used = bytes_used - old.bytes_used
+ WHERE storage_policy_index = old.storage_policy_index;
+ END;
+
+"""
+
+
class AccountBroker(DatabaseBroker):
"""Encapsulates working with an account database."""
db_type = 'account'
@@ -49,6 +71,7 @@ class AccountBroker(DatabaseBroker):
'Attempting to create a new database with no account set')
self.create_container_table(conn)
self.create_account_stat_table(conn, put_timestamp)
+ self.create_policy_stat_table(conn)
def create_container_table(self, conn):
"""
@@ -64,7 +87,8 @@ class AccountBroker(DatabaseBroker):
delete_timestamp TEXT,
object_count INTEGER,
bytes_used INTEGER,
- deleted INTEGER DEFAULT 0
+ deleted INTEGER DEFAULT 0,
+ storage_policy_index INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted_name ON
@@ -99,7 +123,7 @@ class AccountBroker(DatabaseBroker):
old.delete_timestamp || '-' ||
old.object_count || '-' || old.bytes_used);
END;
- """)
+ """ + POLICY_STAT_TRIGGER_SCRIPT)
def create_account_stat_table(self, conn, put_timestamp):
"""
@@ -134,6 +158,27 @@ class AccountBroker(DatabaseBroker):
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp))
+ def create_policy_stat_table(self, conn):
+ """
+ Create policy_stat table which is specific to the account DB.
+ Not a part of Pluggable Back-ends, internal to the baseline code.
+
+ :param conn: DB connection object
+ """
+ conn.executescript("""
+ CREATE TABLE policy_stat (
+ storage_policy_index INTEGER PRIMARY KEY,
+ object_count INTEGER DEFAULT 0,
+ bytes_used INTEGER DEFAULT 0
+ );
+ INSERT OR IGNORE INTO policy_stat (
+ storage_policy_index, object_count, bytes_used
+ )
+ SELECT 0, object_count, bytes_used
+ FROM account_stat
+ WHERE container_count > 0;
+ """)
+
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
@@ -159,16 +204,24 @@ class AccountBroker(DatabaseBroker):
def _commit_puts_load(self, item_list, entry):
"""See :func:`swift.common.db.DatabaseBroker._commit_puts_load`"""
- (name, put_timestamp, delete_timestamp,
- object_count, bytes_used, deleted) = \
- pickle.loads(entry.decode('base64'))
+ loaded = pickle.loads(entry.decode('base64'))
+ # check to see if the update includes policy_index or not
+ (name, put_timestamp, delete_timestamp, object_count, bytes_used,
+ deleted) = loaded[:6]
+ if len(loaded) > 6:
+ storage_policy_index = loaded[6]
+ else:
+ # legacy support during upgrade until first non legacy storage
+ # policy is defined
+ storage_policy_index = 0
item_list.append(
{'name': name,
'put_timestamp': put_timestamp,
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
- 'deleted': deleted})
+ 'deleted': deleted,
+ 'storage_policy_index': storage_policy_index})
def empty(self):
"""
@@ -183,7 +236,7 @@ class AccountBroker(DatabaseBroker):
return (row[0] == 0)
def put_container(self, name, put_timestamp, delete_timestamp,
- object_count, bytes_used):
+ object_count, bytes_used, storage_policy_index):
"""
Create a container with the given attributes.
@@ -192,6 +245,7 @@ class AccountBroker(DatabaseBroker):
:param delete_timestamp: delete_timestamp of the container to create
:param object_count: number of objects in the container
:param bytes_used: number of bytes used by the container
+ :param storage_policy_index: the storage policy for this container
"""
if delete_timestamp > put_timestamp and \
object_count in (None, '', 0, '0'):
@@ -202,7 +256,8 @@ class AccountBroker(DatabaseBroker):
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
- 'deleted': deleted}
+ 'deleted': deleted,
+ 'storage_policy_index': storage_policy_index}
if self.db_file == ':memory:':
self.merge_items([record])
return
@@ -225,7 +280,7 @@ class AccountBroker(DatabaseBroker):
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
- bytes_used, deleted),
+ bytes_used, deleted, storage_policy_index),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
@@ -255,19 +310,47 @@ class AccountBroker(DatabaseBroker):
FROM account_stat''').fetchone()
return (row['status'] == "DELETED")
+ def get_policy_stats(self):
+ """
+ Get global policy stats for the account.
+
+ :returns: dict of policy stats where the key is the policy index and
+ the value is a dictionary like {'object_count': M,
+ 'bytes_used': N}
+ """
+ info = []
+ self._commit_puts_stale_ok()
+ with self.get() as conn:
+ try:
+ info = (conn.execute('''
+ SELECT storage_policy_index, object_count, bytes_used
+ FROM policy_stat
+ ''').fetchall())
+ except sqlite3.OperationalError as err:
+ if "no such table: policy_stat" not in str(err):
+ raise
+
+ policy_stats = {}
+ for row in info:
+ stats = dict(row)
+ key = stats.pop('storage_policy_index')
+ policy_stats[key] = stats
+ return policy_stats
+
def get_info(self):
"""
Get global data for the account.
:returns: dict with keys: account, created_at, put_timestamp,
- delete_timestamp, container_count, object_count,
- bytes_used, hash, id
+ delete_timestamp, status_changed_at, container_count,
+ object_count, bytes_used, hash, id
"""
self._commit_puts_stale_ok()
with self.get() as conn:
return dict(conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp,
- container_count, object_count, bytes_used, hash, id
+ status_changed_at, container_count, object_count,
+ bytes_used, hash, id
FROM account_stat
''').fetchone())
@@ -359,18 +442,20 @@ class AccountBroker(DatabaseBroker):
:param item_list: list of dictionaries of {'name', 'put_timestamp',
'delete_timestamp', 'object_count', 'bytes_used',
- 'deleted'}
+ 'deleted', 'storage_policy_index'}
:param source: if defined, update incoming_sync with the source
"""
- with self.get() as conn:
+ def _really_merge_items(conn):
max_rowid = -1
for rec in item_list:
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
- rec['bytes_used'], rec['deleted']]
+ rec['bytes_used'], rec['deleted'],
+ rec['storage_policy_index']]
query = '''
SELECT name, put_timestamp, delete_timestamp,
- object_count, bytes_used, deleted
+ object_count, bytes_used, deleted,
+ storage_policy_index
FROM container WHERE name = ?
'''
if self.get_db_version(conn) >= 1:
@@ -400,8 +485,8 @@ class AccountBroker(DatabaseBroker):
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
- deleted)
- VALUES (?, ?, ?, ?, ?, ?)
+ deleted, storage_policy_index)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
''', record)
if source:
max_rowid = max(max_rowid, rec['ROWID'])
@@ -413,7 +498,33 @@ class AccountBroker(DatabaseBroker):
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
- UPDATE incoming_sync SET sync_point=max(?, sync_point)
+ UPDATE incoming_sync
+ SET sync_point=max(?, sync_point)
WHERE remote_id=?
''', (max_rowid, source))
conn.commit()
+
+ with self.get() as conn:
+ # create the policy stat table if needed and add spi to container
+ try:
+ _really_merge_items(conn)
+ except sqlite3.OperationalError as err:
+ if 'no such column: storage_policy_index' not in str(err):
+ raise
+ self._migrate_add_storage_policy_index(conn)
+ _really_merge_items(conn)
+
+ def _migrate_add_storage_policy_index(self, conn):
+ """
+ Add the storage_policy_index column to the 'container' table and
+ set up triggers, creating the policy_stat table if needed.
+ """
+ try:
+ self.create_policy_stat_table(conn)
+ except sqlite3.OperationalError as err:
+ if 'table policy_stat already exists' not in str(err):
+ raise
+ conn.executescript('''
+ ALTER TABLE container
+ ADD COLUMN storage_policy_index INTEGER DEFAULT 0;
+ ''' + POLICY_STAT_TRIGGER_SCRIPT)
diff --git a/swift/account/server.py b/swift/account/server.py
index 919b51bbf..6427b6b7d 100644
--- a/swift/account/server.py
+++ b/swift/account/server.py
@@ -22,7 +22,7 @@ from eventlet import Timeout
import swift.common.db
from swift.account.backend import AccountBroker, DATADIR
-from swift.account.utils import account_listing_response
+from swift.account.utils import account_listing_response, get_response_headers
from swift.common.db import DatabaseConnectionError, DatabaseAlreadyExists
from swift.common.request_helpers import get_param, get_listing_content_type, \
split_and_validate_path
@@ -38,6 +38,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, \
HTTPPreconditionFailed, HTTPConflict, Request, \
HTTPInsufficientStorage, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta
+from swift.common.storage_policy import POLICY_INDEX
class AccountController(object):
@@ -108,6 +109,7 @@ class AccountController(object):
return HTTPInsufficientStorage(drive=drive, request=req)
if container: # put account container
pending_timeout = None
+ container_policy_index = req.headers.get(POLICY_INDEX, 0)
if 'x-trans-id' in req.headers:
pending_timeout = 3
broker = self._get_account_broker(drive, part, account,
@@ -125,7 +127,8 @@ class AccountController(object):
broker.put_container(container, req.headers['x-put-timestamp'],
req.headers['x-delete-timestamp'],
req.headers['x-object-count'],
- req.headers['x-bytes-used'])
+ req.headers['x-bytes-used'],
+ container_policy_index)
if req.headers['x-delete-timestamp'] > \
req.headers['x-put-timestamp']:
return HTTPNoContent(request=req)
@@ -172,16 +175,7 @@ class AccountController(object):
stale_reads_ok=True)
if broker.is_deleted():
return self._deleted_response(broker, req, HTTPNotFound)
- info = broker.get_info()
- headers = {
- 'X-Account-Container-Count': info['container_count'],
- 'X-Account-Object-Count': info['object_count'],
- 'X-Account-Bytes-Used': info['bytes_used'],
- 'X-Timestamp': info['created_at'],
- 'X-PUT-Timestamp': info['put_timestamp']}
- headers.update((key, value)
- for key, (value, timestamp) in
- broker.metadata.iteritems() if value != '')
+ headers = get_response_headers(broker)
headers['Content-Type'] = out_content_type
return HTTPNoContent(request=req, headers=headers, charset='utf-8')
diff --git a/swift/account/utils.py b/swift/account/utils.py
index 9854fae41..13dec505c 100644
--- a/swift/account/utils.py
+++ b/swift/account/utils.py
@@ -18,6 +18,7 @@ from xml.sax import saxutils
from swift.common.swob import HTTPOk, HTTPNoContent
from swift.common.utils import json, normalize_timestamp
+from swift.common.storage_policy import POLICIES
class FakeAccountBroker(object):
@@ -40,13 +41,11 @@ class FakeAccountBroker(object):
def metadata(self):
return {}
+ def get_policy_stats(self):
+ return {}
-def account_listing_response(account, req, response_content_type, broker=None,
- limit='', marker='', end_marker='', prefix='',
- delimiter=''):
- if broker is None:
- broker = FakeAccountBroker()
+def get_response_headers(broker):
info = broker.get_info()
resp_headers = {
'X-Account-Container-Count': info['container_count'],
@@ -54,9 +53,28 @@ def account_listing_response(account, req, response_content_type, broker=None,
'X-Account-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp']}
+ policy_stats = broker.get_policy_stats()
+ for policy_idx, stats in policy_stats.items():
+ policy = POLICIES.get_by_index(policy_idx)
+ if not policy:
+ continue
+ header_prefix = 'X-Account-Storage-Policy-%s-%%s' % policy.name
+ for key, value in stats.items():
+ header_name = header_prefix % key.replace('_', '-')
+ resp_headers[header_name] = value
resp_headers.update((key, value)
for key, (value, timestamp) in
broker.metadata.iteritems() if value != '')
+ return resp_headers
+
+
+def account_listing_response(account, req, response_content_type, broker=None,
+ limit='', marker='', end_marker='', prefix='',
+ delimiter=''):
+ if broker is None:
+ broker = FakeAccountBroker()
+
+ resp_headers = get_response_headers(broker)
account_list = broker.list_containers_iter(limit, marker, end_marker,
prefix, delimiter)
diff --git a/swift/container/server.py b/swift/container/server.py
index 15269f861..e028ddf9e 100644
--- a/swift/container/server.py
+++ b/swift/container/server.py
@@ -203,6 +203,7 @@ class ContainerController(object):
'x-object-count': info['object_count'],
'x-bytes-used': info['bytes_used'],
'x-trans-id': req.headers.get('x-trans-id', '-'),
+ POLICY_INDEX: info['storage_policy_index'],
'user-agent': 'container-server %s' % os.getpid(),
'referer': req.as_referer()})
if req.headers.get('x-account-override-deleted', 'no').lower() == \
diff --git a/swift/container/updater.py b/swift/container/updater.py
index fdc01e3fa..2a1316447 100644
--- a/swift/container/updater.py
+++ b/swift/container/updater.py
@@ -33,6 +33,7 @@ from swift.common.utils import get_logger, config_true_value, ismount, \
dump_recon_cache, quorum_size
from swift.common.daemon import Daemon
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
+from swift.common.storage_policy import POLICY_INDEX
class ContainerUpdater(Daemon):
@@ -221,7 +222,8 @@ class ContainerUpdater(Daemon):
part, nodes = self.get_account_ring().get_nodes(info['account'])
events = [spawn(self.container_report, node, part, container,
info['put_timestamp'], info['delete_timestamp'],
- info['object_count'], info['bytes_used'])
+ info['object_count'], info['bytes_used'],
+ info['storage_policy_index'])
for node in nodes]
successes = 0
for event in events:
@@ -254,7 +256,8 @@ class ContainerUpdater(Daemon):
self.no_changes += 1
def container_report(self, node, part, container, put_timestamp,
- delete_timestamp, count, bytes):
+ delete_timestamp, count, bytes,
+ storage_policy_index):
"""
Report container info to an account server.
@@ -265,6 +268,7 @@ class ContainerUpdater(Daemon):
:param delete_timestamp: delete timestamp
:param count: object count in the container
:param bytes: bytes used in the container
+ :param storage_policy_index: the policy index for the container
"""
with ConnectionTimeout(self.conn_timeout):
try:
@@ -274,6 +278,7 @@ class ContainerUpdater(Daemon):
'X-Object-Count': count,
'X-Bytes-Used': bytes,
'X-Account-Override-Deleted': 'yes',
+ POLICY_INDEX: storage_policy_index,
'user-agent': self.user_agent}
conn = http_connect(
node['ip'], node['port'], node['device'], part,
diff --git a/test/unit/account/test_backend.py b/test/unit/account/test_backend.py
index f75dd9a08..61a4f66c5 100644
--- a/test/unit/account/test_backend.py
+++ b/test/unit/account/test_backend.py
@@ -17,14 +17,24 @@
import hashlib
import unittest
+import pickle
+import os
from time import sleep, time
from uuid import uuid4
+from tempfile import mkdtemp
+from shutil import rmtree
+import sqlite3
+import itertools
+from contextlib import contextmanager
from swift.account.backend import AccountBroker
from swift.common.utils import normalize_timestamp
+from test.unit import patch_policies, with_tempdir
from swift.common.db import DatabaseConnectionError
+from swift.common.storage_policy import StoragePolicy, POLICIES
+@patch_policies
class TestAccountBroker(unittest.TestCase):
"""Tests for AccountBroker"""
@@ -70,16 +80,19 @@ class TestAccountBroker(unittest.TestCase):
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
self.assert_(broker.empty())
- broker.put_container('o', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('o', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
self.assert_(not broker.empty())
sleep(.00001)
- broker.put_container('o', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('o', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
self.assert_(broker.empty())
def test_reclaim(self):
broker = AccountBroker(':memory:', account='test_account')
broker.initialize(normalize_timestamp('1'))
- broker.put_container('c', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('c', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@@ -96,7 +109,8 @@ class TestAccountBroker(unittest.TestCase):
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
sleep(.00001)
- broker.put_container('c', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('c', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@@ -122,9 +136,9 @@ class TestAccountBroker(unittest.TestCase):
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
# Test reclaim after deletion. Create 3 test containers
- broker.put_container('x', 0, 0, 0, 0)
- broker.put_container('y', 0, 0, 0, 0)
- broker.put_container('z', 0, 0, 0, 0)
+ broker.put_container('x', 0, 0, 0, 0, POLICIES.default.idx)
+ broker.put_container('y', 0, 0, 0, 0, POLICIES.default.idx)
+ broker.put_container('z', 0, 0, 0, 0, POLICIES.default.idx)
broker.reclaim(normalize_timestamp(time()), time())
# self.assertEqual(len(res), 2)
# self.assert_(isinstance(res, tuple))
@@ -144,11 +158,32 @@ class TestAccountBroker(unittest.TestCase):
# self.assert_('z' in containers)
# self.assert_('a' not in containers)
+ def test_delete_db_status(self):
+ start = int(time())
+ ts = itertools.count(start)
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp(ts.next()))
+ info = broker.get_info()
+ self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
+ self.assert_(float(info['created_at']) >= start)
+ self.assertEqual(info['delete_timestamp'], '0')
+ self.assertEqual(info['status_changed_at'], '0')
+
+ # delete it
+ delete_timestamp = normalize_timestamp(ts.next())
+ broker.delete_db(delete_timestamp)
+ info = broker.get_info()
+ self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
+ self.assert_(float(info['created_at']) >= start)
+ self.assertEqual(info['delete_timestamp'], delete_timestamp)
+ self.assertEqual(info['status_changed_at'], delete_timestamp)
+
def test_delete_container(self):
# Test AccountBroker.delete_container
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
- broker.put_container('o', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('o', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@@ -157,7 +192,8 @@ class TestAccountBroker(unittest.TestCase):
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
sleep(.00001)
- broker.put_container('o', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('o', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@@ -173,7 +209,8 @@ class TestAccountBroker(unittest.TestCase):
# Create initial container
timestamp = normalize_timestamp(time())
- broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -185,7 +222,8 @@ class TestAccountBroker(unittest.TestCase):
"SELECT deleted FROM container").fetchone()[0], 0)
# Reput same event
- broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -199,7 +237,8 @@ class TestAccountBroker(unittest.TestCase):
# Put new event
sleep(.00001)
timestamp = normalize_timestamp(time())
- broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -212,7 +251,8 @@ class TestAccountBroker(unittest.TestCase):
# Put old event
otimestamp = normalize_timestamp(float(timestamp) - 1)
- broker.put_container('"{<container \'&\' name>}"', otimestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', otimestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -225,7 +265,8 @@ class TestAccountBroker(unittest.TestCase):
# Put old delete event
dtimestamp = normalize_timestamp(float(timestamp) - 1)
- broker.put_container('"{<container \'&\' name>}"', 0, dtimestamp, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', 0, dtimestamp, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -242,7 +283,8 @@ class TestAccountBroker(unittest.TestCase):
# Put new delete event
sleep(.00001)
timestamp = normalize_timestamp(time())
- broker.put_container('"{<container \'&\' name>}"', 0, timestamp, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', 0, timestamp, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -256,7 +298,8 @@ class TestAccountBroker(unittest.TestCase):
# Put new event
sleep(.00001)
timestamp = normalize_timestamp(time())
- broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -275,31 +318,39 @@ class TestAccountBroker(unittest.TestCase):
info = broker.get_info()
self.assertEqual(info['account'], 'test1')
self.assertEqual(info['hash'], '00000000000000000000000000000000')
+ self.assertEqual(info['put_timestamp'], normalize_timestamp(1))
+ self.assertEqual(info['delete_timestamp'], '0')
+ self.assertEqual(info['status_changed_at'], '0')
info = broker.get_info()
self.assertEqual(info['container_count'], 0)
- broker.put_container('c1', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('c1', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 1)
sleep(.00001)
- broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 2)
sleep(.00001)
- broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 2)
sleep(.00001)
- broker.put_container('c1', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('c1', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 1)
sleep(.00001)
- broker.put_container('c2', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('c2', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 0)
@@ -310,14 +361,17 @@ class TestAccountBroker(unittest.TestCase):
for cont1 in xrange(4):
for cont2 in xrange(125):
broker.put_container('%d-%04d' % (cont1, cont2),
- normalize_timestamp(time()), 0, 0, 0)
+ normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
for cont in xrange(125):
broker.put_container('2-0051-%04d' % cont,
- normalize_timestamp(time()), 0, 0, 0)
+ normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
for cont in xrange(125):
broker.put_container('3-%04d-0049' % cont,
- normalize_timestamp(time()), 0, 0, 0)
+ normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
listing = broker.list_containers_iter(100, '', None, None, '')
self.assertEqual(len(listing), 100)
@@ -381,7 +435,8 @@ class TestAccountBroker(unittest.TestCase):
'3-0047-', '3-0048', '3-0048-', '3-0049',
'3-0049-', '3-0050'])
- broker.put_container('3-0049-', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('3-0049-', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
listing = broker.list_containers_iter(10, '3-0048', None, None, None)
self.assertEqual(len(listing), 10)
self.assertEqual([row[0] for row in listing],
@@ -406,16 +461,26 @@ class TestAccountBroker(unittest.TestCase):
# account that has an odd container with a trailing delimiter
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
- broker.put_container('a', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-a', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-a-a', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-a-b', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-b', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('b', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('b-a', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('b-b', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('c', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('a', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-a', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-a-a', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-a-b', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-b', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('b', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('b-a', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('b-b', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('c', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
listing = broker.list_containers_iter(15, None, None, None, None)
self.assertEqual(len(listing), 10)
self.assertEqual([row[0] for row in listing],
@@ -437,9 +502,11 @@ class TestAccountBroker(unittest.TestCase):
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
broker.put_container('a', normalize_timestamp(1),
- normalize_timestamp(0), 0, 0)
+ normalize_timestamp(0), 0, 0,
+ POLICIES.default.idx)
broker.put_container('b', normalize_timestamp(2),
- normalize_timestamp(0), 0, 0)
+ normalize_timestamp(0), 0, 0,
+ POLICIES.default.idx)
hasha = hashlib.md5(
'%s-%s' % ('a', '0000000001.00000-0000000000.00000-0-0')
).digest()
@@ -450,7 +517,8 @@ class TestAccountBroker(unittest.TestCase):
''.join(('%02x' % (ord(a) ^ ord(b)) for a, b in zip(hasha, hashb)))
self.assertEqual(broker.get_info()['hash'], hashc)
broker.put_container('b', normalize_timestamp(3),
- normalize_timestamp(0), 0, 0)
+ normalize_timestamp(0), 0, 0,
+ POLICIES.default.idx)
hashb = hashlib.md5(
'%s-%s' % ('b', '0000000003.00000-0000000000.00000-0-0')
).digest()
@@ -463,15 +531,18 @@ class TestAccountBroker(unittest.TestCase):
broker1.initialize(normalize_timestamp('1'))
broker2 = AccountBroker(':memory:', account='a')
broker2.initialize(normalize_timestamp('1'))
- broker1.put_container('a', normalize_timestamp(1), 0, 0, 0)
- broker1.put_container('b', normalize_timestamp(2), 0, 0, 0)
+ broker1.put_container('a', normalize_timestamp(1), 0, 0, 0,
+ POLICIES.default.idx)
+ broker1.put_container('b', normalize_timestamp(2), 0, 0, 0,
+ POLICIES.default.idx)
id = broker1.get_info()['id']
broker2.merge_items(broker1.get_items_since(
broker2.get_sync(id), 1000), id)
items = broker2.get_items_since(-1, 1000)
self.assertEqual(len(items), 2)
self.assertEqual(['a', 'b'], sorted([rec['name'] for rec in items]))
- broker1.put_container('c', normalize_timestamp(3), 0, 0, 0)
+ broker1.put_container('c', normalize_timestamp(3), 0, 0, 0,
+ POLICIES.default.idx)
broker2.merge_items(broker1.get_items_since(
broker2.get_sync(id), 1000), id)
items = broker2.get_items_since(-1, 1000)
@@ -479,6 +550,145 @@ class TestAccountBroker(unittest.TestCase):
self.assertEqual(['a', 'b', 'c'],
sorted([rec['name'] for rec in items]))
+ def test_load_old_pending_puts(self):
+ # pending puts from pre-storage-policy account brokers won't contain
+ # the storage policy index
+ tempdir = mkdtemp()
+ broker_path = os.path.join(tempdir, 'test-load-old.db')
+ try:
+ broker = AccountBroker(broker_path, account='real')
+ broker.initialize(normalize_timestamp(1))
+ with open(broker_path + '.pending', 'a+b') as pending:
+ pending.write(':')
+ pending.write(pickle.dumps(
+ # name, put_timestamp, delete_timestamp, object_count,
+ # bytes_used, deleted
+ ('oldcon', normalize_timestamp(200),
+ normalize_timestamp(0),
+ 896, 9216695, 0)).encode('base64'))
+
+ broker._commit_puts()
+ with broker.get() as conn:
+ results = list(conn.execute('''
+ SELECT name, storage_policy_index FROM container
+ '''))
+ self.assertEqual(len(results), 1)
+ self.assertEqual(dict(results[0]),
+ {'name': 'oldcon', 'storage_policy_index': 0})
+ finally:
+ rmtree(tempdir)
+
+ @patch_policies([StoragePolicy(0, 'zero', False),
+ StoragePolicy(1, 'one', True),
+ StoragePolicy(2, 'two', False),
+ StoragePolicy(3, 'three', False)])
+ def test_get_policy_stats(self):
+ ts = itertools.count()
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp(ts.next()))
+ # check empty policy_stats
+ self.assertTrue(broker.empty())
+ policy_stats = broker.get_policy_stats()
+ self.assertEqual(policy_stats, {})
+
+ # add some empty containers
+ for policy in POLICIES:
+ container_name = 'c-%s' % policy.name
+ put_timestamp = normalize_timestamp(ts.next())
+ broker.put_container(container_name,
+ put_timestamp, 0,
+ 0, 0,
+ policy.idx)
+
+ policy_stats = broker.get_policy_stats()
+ stats = policy_stats[policy.idx]
+ self.assertEqual(stats['object_count'], 0)
+ self.assertEqual(stats['bytes_used'], 0)
+
+ # update the containers object & byte count
+ for policy in POLICIES:
+ container_name = 'c-%s' % policy.name
+ put_timestamp = normalize_timestamp(ts.next())
+ count = policy.idx * 100 # good as any integer
+ broker.put_container(container_name,
+ put_timestamp, 0,
+ count, count,
+ policy.idx)
+
+ policy_stats = broker.get_policy_stats()
+ stats = policy_stats[policy.idx]
+ self.assertEqual(stats['object_count'], count)
+ self.assertEqual(stats['bytes_used'], count)
+
+ # check all the policy_stats at once
+ for policy_index, stats in policy_stats.items():
+ policy = POLICIES[policy_index]
+ count = policy.idx * 100 # coupled with policy for test
+ self.assertEqual(stats['object_count'], count)
+ self.assertEqual(stats['bytes_used'], count)
+
+ # now delete the containers one by one
+ for policy in POLICIES:
+ container_name = 'c-%s' % policy.name
+ delete_timestamp = normalize_timestamp(ts.next())
+ broker.put_container(container_name,
+ 0, delete_timestamp,
+ 0, 0,
+ policy.idx)
+
+ policy_stats = broker.get_policy_stats()
+ stats = policy_stats[policy.idx]
+ self.assertEqual(stats['object_count'], 0)
+ self.assertEqual(stats['bytes_used'], 0)
+
+ @patch_policies([StoragePolicy(0, 'zero', False),
+ StoragePolicy(1, 'one', True)])
+ def test_policy_stats_tracking(self):
+ ts = itertools.count()
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp(ts.next()))
+
+ # policy 0
+ broker.put_container('con1', ts.next(), 0, 12, 2798641, 0)
+ broker.put_container('con1', ts.next(), 0, 13, 8156441, 0)
+ # policy 1
+ broker.put_container('con2', ts.next(), 0, 7, 5751991, 1)
+ broker.put_container('con2', ts.next(), 0, 8, 6085379, 1)
+
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 2)
+ self.assertEqual(stats[0]['object_count'], 13)
+ self.assertEqual(stats[0]['bytes_used'], 8156441)
+ self.assertEqual(stats[1]['object_count'], 8)
+ self.assertEqual(stats[1]['bytes_used'], 6085379)
+
+ # Break encapsulation here to make sure that there's only 2 rows in
+ # the stats table. It's possible that there could be 4 rows (one per
+ # put_container) but that they came out in the right order so that
+ # get_policy_stats() collapsed them down to the right number. To prove
+ # that's not so, we have to go peek at the broker's internals.
+ with broker.get() as conn:
+ nrows = conn.execute(
+ "SELECT COUNT(*) FROM policy_stat").fetchall()[0][0]
+ self.assertEqual(nrows, 2)
+
+
+def prespi_AccountBroker_initialize(self, conn, put_timestamp, **kwargs):
+ """
+ The AccountBroker initialze() function before we added the
+ policy stat table. Used by test_policy_table_creation() to
+ make sure that the AccountBroker will correctly add the table
+ for cases where the DB existed before the policy suport was added.
+
+ :param conn: DB connection object
+ :param put_timestamp: put timestamp
+ """
+ if not self.account:
+ raise ValueError(
+ 'Attempting to create a new database with no account set')
+ self.create_container_table(conn)
+ self.create_account_stat_table(conn, put_timestamp)
+
def premetadata_create_account_stat_table(self, conn, put_timestamp):
"""
@@ -543,3 +753,274 @@ class TestAccountBrokerBeforeMetadata(TestAccountBroker):
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
conn.execute('SELECT metadata FROM account_stat')
+
+
+def prespi_create_container_table(self, conn):
+ """
+ Copied from AccountBroker before the sstoage_policy_index column was
+ added; used for testing with TestAccountBrokerBeforeSPI.
+
+ Create container table which is specific to the account DB.
+
+ :param conn: DB connection object
+ """
+ conn.executescript("""
+ CREATE TABLE container (
+ ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT,
+ put_timestamp TEXT,
+ delete_timestamp TEXT,
+ object_count INTEGER,
+ bytes_used INTEGER,
+ deleted INTEGER DEFAULT 0
+ );
+
+ CREATE INDEX ix_container_deleted_name ON
+ container (deleted, name);
+
+ CREATE TRIGGER container_insert AFTER INSERT ON container
+ BEGIN
+ UPDATE account_stat
+ SET container_count = container_count + (1 - new.deleted),
+ object_count = object_count + new.object_count,
+ bytes_used = bytes_used + new.bytes_used,
+ hash = chexor(hash, new.name,
+ new.put_timestamp || '-' ||
+ new.delete_timestamp || '-' ||
+ new.object_count || '-' || new.bytes_used);
+ END;
+
+ CREATE TRIGGER container_update BEFORE UPDATE ON container
+ BEGIN
+ SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
+ END;
+
+
+ CREATE TRIGGER container_delete AFTER DELETE ON container
+ BEGIN
+ UPDATE account_stat
+ SET container_count = container_count - (1 - old.deleted),
+ object_count = object_count - old.object_count,
+ bytes_used = bytes_used - old.bytes_used,
+ hash = chexor(hash, old.name,
+ old.put_timestamp || '-' ||
+ old.delete_timestamp || '-' ||
+ old.object_count || '-' || old.bytes_used);
+ END;
+ """)
+
+
+class TestAccountBrokerBeforeSPI(TestAccountBroker):
+ """
+ Tests for AccountBroker against databases created before
+ the storage_policy_index column was added.
+ """
+
+ def setUp(self):
+ self._imported_create_container_table = \
+ AccountBroker.create_container_table
+ AccountBroker.create_container_table = \
+ prespi_create_container_table
+ self._imported_initialize = AccountBroker._initialize
+ AccountBroker._initialize = prespi_AccountBroker_initialize
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp('1'))
+ exc = None
+ with broker.get() as conn:
+ try:
+ conn.execute('SELECT storage_policy_index FROM container')
+ except BaseException as err:
+ exc = err
+ self.assert_('no such column: storage_policy_index' in str(exc))
+ with broker.get() as conn:
+ try:
+ conn.execute('SELECT * FROM policy_stat')
+ except sqlite3.OperationalError as err:
+ self.assert_('no such table: policy_stat' in str(err))
+ else:
+ self.fail('database created with policy_stat table')
+
+ def tearDown(self):
+ AccountBroker.create_container_table = \
+ self._imported_create_container_table
+ AccountBroker._initialize = self._imported_initialize
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp('1'))
+ with broker.get() as conn:
+ conn.execute('SELECT storage_policy_index FROM container')
+
+ @with_tempdir
+ def test_policy_table_migration(self, tempdir):
+ db_path = os.path.join(tempdir, 'account.db')
+
+ # first init an acct DB without the policy_stat table present
+ broker = AccountBroker(db_path, account='a')
+ broker.initialize(normalize_timestamp('1'))
+ with broker.get() as conn:
+ try:
+ conn.execute('''
+ SELECT * FROM policy_stat
+ ''').fetchone()[0]
+ except sqlite3.OperationalError as err:
+ # confirm that the table really isn't there
+ self.assert_('no such table: policy_stat' in str(err))
+ else:
+ self.fail('broker did not raise sqlite3.OperationalError '
+ 'trying to select from policy_stat table!')
+
+ # make sure we can HEAD this thing w/o the table
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 0)
+
+ # now do a PUT to create the table
+ broker.put_container('o', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker._commit_puts_stale_ok()
+
+ # now confirm that the table was created
+ with broker.get() as conn:
+ conn.execute('SELECT * FROM policy_stat')
+
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 1)
+
+ @patch_policies
+ @with_tempdir
+ def test_container_table_migration(self, tempdir):
+ db_path = os.path.join(tempdir, 'account.db')
+
+ # first init an acct DB without the policy_stat table present
+ broker = AccountBroker(db_path, account='a')
+ broker.initialize(normalize_timestamp('1'))
+ with broker.get() as conn:
+ try:
+ conn.execute('''
+ SELECT storage_policy_index FROM container
+ ''').fetchone()[0]
+ except sqlite3.OperationalError as err:
+ # confirm that the table doesn't have this column
+ self.assert_('no such column: storage_policy_index' in
+ str(err))
+ else:
+ self.fail('broker did not raise sqlite3.OperationalError '
+ 'trying to select from storage_policy_index '
+ 'from container table!')
+
+ # manually insert an existing row to avoid migration
+ with broker.get() as conn:
+ conn.execute('''
+ INSERT INTO container (name, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ deleted)
+ VALUES (?, ?, ?, ?, ?, ?)
+ ''', ('test_name', normalize_timestamp(time()), 0, 1, 2, 0))
+ conn.commit()
+
+ # make sure we can iter containers without the migration
+ for c in broker.list_containers_iter(1, None, None, None, None):
+ self.assertEqual(c, ('test_name', 1, 2, 0))
+
+ # stats table is mysteriously empty...
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 0)
+
+ # now do a PUT with a different value for storage_policy_index
+ # which will update the DB schema as well as update policy_stats
+ # for legacy containers in the DB (those without an SPI)
+ other_policy = [p for p in POLICIES if p.idx != 0][0]
+ broker.put_container('test_second', normalize_timestamp(time()),
+ 0, 3, 4, other_policy.idx)
+ broker._commit_puts_stale_ok()
+
+ with broker.get() as conn:
+ rows = conn.execute('''
+ SELECT name, storage_policy_index FROM container
+ ''').fetchall()
+ for row in rows:
+ if row[0] == 'test_name':
+ self.assertEqual(row[1], 0)
+ else:
+ self.assertEqual(row[1], other_policy.idx)
+
+ # we should have stats for both containers
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 2)
+ self.assertEqual(stats[0]['object_count'], 1)
+ self.assertEqual(stats[0]['bytes_used'], 2)
+ self.assertEqual(stats[1]['object_count'], 3)
+ self.assertEqual(stats[1]['bytes_used'], 4)
+
+ # now lets delete a container and make sure policy_stats is OK
+ with broker.get() as conn:
+ conn.execute('''
+ DELETE FROM container WHERE name = ?
+ ''', ('test_name',))
+ conn.commit()
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 2)
+ self.assertEqual(stats[0]['object_count'], 0)
+ self.assertEqual(stats[0]['bytes_used'], 0)
+ self.assertEqual(stats[1]['object_count'], 3)
+ self.assertEqual(stats[1]['bytes_used'], 4)
+
+ @with_tempdir
+ def test_half_upgraded_database(self, tempdir):
+ db_path = os.path.join(tempdir, 'account.db')
+ ts = itertools.count()
+
+ broker = AccountBroker(db_path, account='a')
+ broker.initialize(normalize_timestamp(ts.next()))
+
+ self.assertTrue(broker.empty())
+
+ # add a container (to pending file)
+ broker.put_container('c', normalize_timestamp(ts.next()), 0, 0, 0,
+ POLICIES.default.idx)
+
+ real_get = broker.get
+ called = []
+
+ @contextmanager
+ def mock_get():
+ with real_get() as conn:
+
+ def mock_executescript(script):
+ if called:
+ raise Exception('kaboom!')
+ called.append(script)
+
+ conn.executescript = mock_executescript
+ yield conn
+
+ broker.get = mock_get
+
+ try:
+ broker._commit_puts()
+ except Exception:
+ pass
+ else:
+ self.fail('mock exception was not raised')
+
+ self.assertEqual(len(called), 1)
+ self.assert_('CREATE TABLE policy_stat' in called[0])
+
+ # nothing was commited
+ broker = AccountBroker(db_path, account='a')
+ with broker.get() as conn:
+ try:
+ conn.execute('SELECT * FROM policy_stat')
+ except sqlite3.OperationalError as err:
+ self.assert_('no such table: policy_stat' in str(err))
+ else:
+ self.fail('half upgraded database!')
+ container_count = conn.execute(
+ 'SELECT count(*) FROM container').fetchone()[0]
+ self.assertEqual(container_count, 0)
+
+ # try again to commit puts
+ self.assertFalse(broker.empty())
+
+ # full migration successful
+ with broker.get() as conn:
+ conn.execute('SELECT * FROM policy_stat')
+ conn.execute('SELECT storage_policy_index FROM container')
diff --git a/test/unit/account/test_replicator.py b/test/unit/account/test_replicator.py
index 0bba02325..43e3a4d72 100644
--- a/test/unit/account/test_replicator.py
+++ b/test/unit/account/test_replicator.py
@@ -13,18 +13,132 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
+import time
import unittest
+import shutil
+from swift.account import replicator, backend, server
+from swift.common.utils import normalize_timestamp
+from swift.common.storage_policy import POLICIES
-class TestReplicator(unittest.TestCase):
- """
- swift.account.replicator is currently just a subclass with some class
- variables overridden, but at least this test stub will ensure proper Python
- syntax.
- """
+from test.unit.common import test_db_replicator
- def test_placeholder(self):
- pass
+
+class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
+
+ backend = backend.AccountBroker
+ datadir = server.DATADIR
+ replicator_daemon = replicator.AccountReplicator
+
+ def test_sync(self):
+ broker = self._get_broker('a', node_index=0)
+ put_timestamp = normalize_timestamp(time.time())
+ broker.initialize(put_timestamp)
+ # "replicate" to same database
+ daemon = replicator.AccountReplicator({})
+ part, node = self._get_broker_part_node(broker)
+ info = broker.get_replication_info()
+ success = daemon._repl_to_node(node, broker, part, info)
+ # nothing to do
+ self.assertTrue(success)
+ self.assertEqual(1, daemon.stats['no_change'])
+
+ def test_sync_remote_missing(self):
+ broker = self._get_broker('a', node_index=0)
+ put_timestamp = time.time()
+ broker.initialize(put_timestamp)
+ # "replicate" to all other nodes
+ part, node = self._get_broker_part_node(broker)
+ daemon = self._run_once(node)
+ # complete rsync
+ self.assertEqual(2, daemon.stats['rsync'])
+ local_info = self._get_broker(
+ 'a', node_index=0).get_info()
+ for i in range(1, 3):
+ remote_broker = self._get_broker('a', node_index=i)
+ self.assertTrue(os.path.exists(remote_broker.db_file))
+ remote_info = remote_broker.get_info()
+ for k, v in local_info.items():
+ if k == 'id':
+ continue
+ self.assertEqual(remote_info[k], v,
+ "mismatch remote %s %r != %r" % (
+ k, remote_info[k], v))
+
+ def test_sync_remote_missing_most_rows(self):
+ put_timestamp = time.time()
+ # create "local" broker
+ broker = self._get_broker('a', node_index=0)
+ broker.initialize(put_timestamp)
+ # create "remote" broker
+ remote_broker = self._get_broker('a', node_index=1)
+ remote_broker.initialize(put_timestamp)
+ # add a row to "local" db
+ broker.put_container('/a/c', time.time(), 0, 0, 0,
+ POLICIES.default.idx)
+ #replicate
+ daemon = replicator.AccountReplicator({})
+
+ def _rsync_file(db_file, remote_file, **kwargs):
+ remote_server, remote_path = remote_file.split('/', 1)
+ dest_path = os.path.join(self.root, remote_path)
+ shutil.copy(db_file, dest_path)
+ return True
+ daemon._rsync_file = _rsync_file
+ part, node = self._get_broker_part_node(remote_broker)
+ info = broker.get_replication_info()
+ success = daemon._repl_to_node(node, broker, part, info)
+ self.assertTrue(success)
+ # row merge
+ self.assertEqual(1, daemon.stats['remote_merge'])
+ local_info = self._get_broker(
+ 'a', node_index=0).get_info()
+ remote_info = self._get_broker(
+ 'a', node_index=1).get_info()
+ for k, v in local_info.items():
+ if k == 'id':
+ continue
+ self.assertEqual(remote_info[k], v,
+ "mismatch remote %s %r != %r" % (
+ k, remote_info[k], v))
+
+ def test_sync_remote_missing_one_rows(self):
+ put_timestamp = time.time()
+ # create "local" broker
+ broker = self._get_broker('a', node_index=0)
+ broker.initialize(put_timestamp)
+ # create "remote" broker
+ remote_broker = self._get_broker('a', node_index=1)
+ remote_broker.initialize(put_timestamp)
+ # add some rows to both db
+ for i in range(10):
+ put_timestamp = time.time()
+ for db in (broker, remote_broker):
+ path = '/a/c_%s' % i
+ db.put_container(path, put_timestamp, 0, 0, 0,
+ POLICIES.default.idx)
+ # now a row to the "local" broker only
+ broker.put_container('/a/c_missing', time.time(), 0, 0, 0,
+ POLICIES.default.idx)
+ # replicate
+ daemon = replicator.AccountReplicator({})
+ part, node = self._get_broker_part_node(remote_broker)
+ info = broker.get_replication_info()
+ success = daemon._repl_to_node(node, broker, part, info)
+ self.assertTrue(success)
+ # row merge
+ self.assertEqual(1, daemon.stats['diff'])
+ local_info = self._get_broker(
+ 'a', node_index=0).get_info()
+ remote_info = self._get_broker(
+ 'a', node_index=1).get_info()
+ for k, v in local_info.items():
+ if k == 'id':
+ continue
+ self.assertEqual(remote_info[k], v,
+ "mismatch remote %s %r != %r" % (
+ k, remote_info[k], v))
if __name__ == '__main__':
diff --git a/test/unit/account/test_server.py b/test/unit/account/test_server.py
index 9e6cc0d7d..0eca82899 100644
--- a/test/unit/account/test_server.py
+++ b/test/unit/account/test_server.py
@@ -22,6 +22,8 @@ from shutil import rmtree
from StringIO import StringIO
from time import gmtime
from test.unit import FakeLogger
+import itertools
+import random
import simplejson
import xml.dom.minidom
@@ -31,8 +33,11 @@ from swift.common import constraints
from swift.account.server import AccountController
from swift.common.utils import normalize_timestamp, replication, public
from swift.common.request_helpers import get_sys_meta_prefix
+from test.unit import patch_policies
+from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
+@patch_policies
class TestAccountController(unittest.TestCase):
"""Test swift.account.server.AccountController"""
def setUp(self):
@@ -1670,6 +1675,182 @@ class TestAccountController(unittest.TestCase):
[(('1.2.3.4 - - [01/Jan/1970:02:46:41 +0000] "HEAD /sda1/p/a" 404 '
'- "-" "-" "-" 2.0000 "-"',), {})])
+ def test_policy_stats_with_legacy(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ # add a container
+ req = Request.blank('/sda1/p/a/c1', method='PUT', headers={
+ 'X-Put-Timestamp': normalize_timestamp(ts.next()),
+ 'X-Delete-Timestamp': '0',
+ 'X-Object-Count': '2',
+ 'X-Bytes-Used': '4',
+ })
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201)
+
+ # read back rollup
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ self.assertEquals(resp.headers['X-Account-Object-Count'], '2')
+ self.assertEquals(resp.headers['X-Account-Bytes-Used'], '4')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Object-Count' %
+ POLICIES[0].name], '2')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
+ POLICIES[0].name], '4')
+
+ def test_policy_stats_non_default(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ # add a container
+ non_default_policies = [p for p in POLICIES if not p.is_default]
+ policy = random.choice(non_default_policies)
+ req = Request.blank('/sda1/p/a/c1', method='PUT', headers={
+ 'X-Put-Timestamp': normalize_timestamp(ts.next()),
+ 'X-Delete-Timestamp': '0',
+ 'X-Object-Count': '2',
+ 'X-Bytes-Used': '4',
+ POLICY_INDEX: policy.idx,
+ })
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201)
+
+ # read back rollup
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ self.assertEquals(resp.headers['X-Account-Object-Count'], '2')
+ self.assertEquals(resp.headers['X-Account-Bytes-Used'], '4')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Object-Count' %
+ policy.name], '2')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
+ policy.name], '4')
+
+ def test_empty_policy_stats(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ for key in resp.headers:
+ self.assert_('storage-policy' not in key.lower())
+
+ def test_empty_except_for_used_policies(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ # starts empty
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ for key in resp.headers:
+ self.assert_('storage-policy' not in key.lower())
+
+ # add a container
+ policy = random.choice(POLICIES)
+ req = Request.blank('/sda1/p/a/c1', method='PUT', headers={
+ 'X-Put-Timestamp': normalize_timestamp(ts.next()),
+ 'X-Delete-Timestamp': '0',
+ 'X-Object-Count': '2',
+ 'X-Bytes-Used': '4',
+ POLICY_INDEX: policy.idx,
+ })
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201)
+
+ # only policy of the created container should be in headers
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ for key in resp.headers:
+ if 'storage-policy' in key.lower():
+ self.assert_(policy.name.lower() in key.lower())
+
+ def test_multiple_policies_in_use(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ # add some containers
+ for policy in POLICIES:
+ count = policy.idx * 100 # good as any integer
+ container_path = '/sda1/p/a/c_%s' % policy.name
+ req = Request.blank(
+ container_path, method='PUT', headers={
+ 'X-Put-Timestamp': normalize_timestamp(ts.next()),
+ 'X-Delete-Timestamp': '0',
+ 'X-Object-Count': count,
+ 'X-Bytes-Used': count,
+ POLICY_INDEX: policy.idx,
+ })
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201)
+
+ req = Request.blank('/sda1/p/a', method='HEAD')
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+
+ # check container counts in roll up headers
+ total_object_count = 0
+ total_bytes_used = 0
+ for key in resp.headers:
+ if 'storage-policy' not in key.lower():
+ continue
+ for policy in POLICIES:
+ if policy.name.lower() not in key.lower():
+ continue
+ if key.lower().endswith('object-count'):
+ object_count = int(resp.headers[key])
+ self.assertEqual(policy.idx * 100, object_count)
+ total_object_count += object_count
+ if key.lower().endswith('bytes-used'):
+ bytes_used = int(resp.headers[key])
+ self.assertEqual(policy.idx * 100, bytes_used)
+ total_bytes_used += bytes_used
+
+ expected_total_count = sum([p.idx * 100 for p in POLICIES])
+ self.assertEqual(expected_total_count, total_object_count)
+ self.assertEqual(expected_total_count, total_bytes_used)
+
+
+@patch_policies([StoragePolicy(0, 'zero', False),
+ StoragePolicy(1, 'one', True),
+ StoragePolicy(2, 'two', False),
+ StoragePolicy(3, 'three', False)])
+class TestNonLegacyDefaultStoragePolicy(TestAccountController):
+
+ pass
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py
index d9e77d353..182319630 100644
--- a/test/unit/container/test_server.py
+++ b/test/unit/container/test_server.py
@@ -2268,6 +2268,7 @@ class TestContainerController(unittest.TestCase):
'x-delete-timestamp': '0',
'x-object-count': 0,
'x-put-timestamp': '0000012345.00000',
+ POLICY_INDEX: '%s' % POLICIES.default.idx,
'referer': 'PUT http://localhost/sda1/p/a/c',
'user-agent': 'container-server %d' % os.getpid(),
'x-trans-id': '-'})})
@@ -2285,6 +2286,7 @@ class TestContainerController(unittest.TestCase):
'x-delete-timestamp': '0',
'x-object-count': 0,
'x-put-timestamp': '0000012345.00000',
+ POLICY_INDEX: '%s' % POLICIES.default.idx,
'referer': 'PUT http://localhost/sda1/p/a/c',
'user-agent': 'container-server %d' % os.getpid(),
'x-trans-id': '-'})})