summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--swift/account/backend.py149
-rw-r--r--swift/account/server.py18
-rw-r--r--swift/account/utils.py28
-rw-r--r--swift/container/server.py1
-rw-r--r--swift/container/updater.py9
-rw-r--r--test/unit/account/test_backend.py563
-rw-r--r--test/unit/account/test_replicator.py130
-rw-r--r--test/unit/account/test_server.py181
-rw-r--r--test/unit/container/test_server.py2
9 files changed, 994 insertions, 87 deletions
diff --git a/swift/account/backend.py b/swift/account/backend.py
index 1a940c49e..815ccd5ef 100644
--- a/swift/account/backend.py
+++ b/swift/account/backend.py
@@ -31,6 +31,28 @@ from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
DATADIR = 'accounts'
+POLICY_STAT_TRIGGER_SCRIPT = """
+ CREATE TRIGGER container_insert_ps AFTER INSERT ON container
+ BEGIN
+ INSERT OR IGNORE INTO policy_stat
+ (storage_policy_index, object_count, bytes_used)
+ VALUES (new.storage_policy_index, 0, 0);
+ UPDATE policy_stat
+ SET object_count = object_count + new.object_count,
+ bytes_used = bytes_used + new.bytes_used
+ WHERE storage_policy_index = new.storage_policy_index;
+ END;
+ CREATE TRIGGER container_delete_ps AFTER DELETE ON container
+ BEGIN
+ UPDATE policy_stat
+ SET object_count = object_count - old.object_count,
+ bytes_used = bytes_used - old.bytes_used
+ WHERE storage_policy_index = old.storage_policy_index;
+ END;
+
+"""
+
+
class AccountBroker(DatabaseBroker):
"""Encapsulates working with an account database."""
db_type = 'account'
@@ -49,6 +71,7 @@ class AccountBroker(DatabaseBroker):
'Attempting to create a new database with no account set')
self.create_container_table(conn)
self.create_account_stat_table(conn, put_timestamp)
+ self.create_policy_stat_table(conn)
def create_container_table(self, conn):
"""
@@ -64,7 +87,8 @@ class AccountBroker(DatabaseBroker):
delete_timestamp TEXT,
object_count INTEGER,
bytes_used INTEGER,
- deleted INTEGER DEFAULT 0
+ deleted INTEGER DEFAULT 0,
+ storage_policy_index INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted_name ON
@@ -99,7 +123,7 @@ class AccountBroker(DatabaseBroker):
old.delete_timestamp || '-' ||
old.object_count || '-' || old.bytes_used);
END;
- """)
+ """ + POLICY_STAT_TRIGGER_SCRIPT)
def create_account_stat_table(self, conn, put_timestamp):
"""
@@ -134,6 +158,27 @@ class AccountBroker(DatabaseBroker):
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp))
+ def create_policy_stat_table(self, conn):
+ """
+ Create policy_stat table which is specific to the account DB.
+ Not a part of Pluggable Back-ends, internal to the baseline code.
+
+ :param conn: DB connection object
+ """
+ conn.executescript("""
+ CREATE TABLE policy_stat (
+ storage_policy_index INTEGER PRIMARY KEY,
+ object_count INTEGER DEFAULT 0,
+ bytes_used INTEGER DEFAULT 0
+ );
+ INSERT OR IGNORE INTO policy_stat (
+ storage_policy_index, object_count, bytes_used
+ )
+ SELECT 0, object_count, bytes_used
+ FROM account_stat
+ WHERE container_count > 0;
+ """)
+
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
@@ -159,16 +204,24 @@ class AccountBroker(DatabaseBroker):
def _commit_puts_load(self, item_list, entry):
"""See :func:`swift.common.db.DatabaseBroker._commit_puts_load`"""
- (name, put_timestamp, delete_timestamp,
- object_count, bytes_used, deleted) = \
- pickle.loads(entry.decode('base64'))
+ loaded = pickle.loads(entry.decode('base64'))
+ # check to see if the update includes policy_index or not
+ (name, put_timestamp, delete_timestamp, object_count, bytes_used,
+ deleted) = loaded[:6]
+ if len(loaded) > 6:
+ storage_policy_index = loaded[6]
+ else:
+ # legacy support during upgrade until first non legacy storage
+ # policy is defined
+ storage_policy_index = 0
item_list.append(
{'name': name,
'put_timestamp': put_timestamp,
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
- 'deleted': deleted})
+ 'deleted': deleted,
+ 'storage_policy_index': storage_policy_index})
def empty(self):
"""
@@ -183,7 +236,7 @@ class AccountBroker(DatabaseBroker):
return (row[0] == 0)
def put_container(self, name, put_timestamp, delete_timestamp,
- object_count, bytes_used):
+ object_count, bytes_used, storage_policy_index):
"""
Create a container with the given attributes.
@@ -192,6 +245,7 @@ class AccountBroker(DatabaseBroker):
:param delete_timestamp: delete_timestamp of the container to create
:param object_count: number of objects in the container
:param bytes_used: number of bytes used by the container
+ :param storage_policy_index: the storage policy for this container
"""
if delete_timestamp > put_timestamp and \
object_count in (None, '', 0, '0'):
@@ -202,7 +256,8 @@ class AccountBroker(DatabaseBroker):
'delete_timestamp': delete_timestamp,
'object_count': object_count,
'bytes_used': bytes_used,
- 'deleted': deleted}
+ 'deleted': deleted,
+ 'storage_policy_index': storage_policy_index}
if self.db_file == ':memory:':
self.merge_items([record])
return
@@ -225,7 +280,7 @@ class AccountBroker(DatabaseBroker):
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
- bytes_used, deleted),
+ bytes_used, deleted, storage_policy_index),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
@@ -255,19 +310,47 @@ class AccountBroker(DatabaseBroker):
FROM account_stat''').fetchone()
return (row['status'] == "DELETED")
+ def get_policy_stats(self):
+ """
+ Get global policy stats for the account.
+
+ :returns: dict of policy stats where the key is the policy index and
+ the value is a dictionary like {'object_count': M,
+ 'bytes_used': N}
+ """
+ info = []
+ self._commit_puts_stale_ok()
+ with self.get() as conn:
+ try:
+ info = (conn.execute('''
+ SELECT storage_policy_index, object_count, bytes_used
+ FROM policy_stat
+ ''').fetchall())
+ except sqlite3.OperationalError as err:
+ if "no such table: policy_stat" not in str(err):
+ raise
+
+ policy_stats = {}
+ for row in info:
+ stats = dict(row)
+ key = stats.pop('storage_policy_index')
+ policy_stats[key] = stats
+ return policy_stats
+
def get_info(self):
"""
Get global data for the account.
:returns: dict with keys: account, created_at, put_timestamp,
- delete_timestamp, container_count, object_count,
- bytes_used, hash, id
+ delete_timestamp, status_changed_at, container_count,
+ object_count, bytes_used, hash, id
"""
self._commit_puts_stale_ok()
with self.get() as conn:
return dict(conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp,
- container_count, object_count, bytes_used, hash, id
+ status_changed_at, container_count, object_count,
+ bytes_used, hash, id
FROM account_stat
''').fetchone())
@@ -359,18 +442,20 @@ class AccountBroker(DatabaseBroker):
:param item_list: list of dictionaries of {'name', 'put_timestamp',
'delete_timestamp', 'object_count', 'bytes_used',
- 'deleted'}
+ 'deleted', 'storage_policy_index'}
:param source: if defined, update incoming_sync with the source
"""
- with self.get() as conn:
+ def _really_merge_items(conn):
max_rowid = -1
for rec in item_list:
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
- rec['bytes_used'], rec['deleted']]
+ rec['bytes_used'], rec['deleted'],
+ rec['storage_policy_index']]
query = '''
SELECT name, put_timestamp, delete_timestamp,
- object_count, bytes_used, deleted
+ object_count, bytes_used, deleted,
+ storage_policy_index
FROM container WHERE name = ?
'''
if self.get_db_version(conn) >= 1:
@@ -400,8 +485,8 @@ class AccountBroker(DatabaseBroker):
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
- deleted)
- VALUES (?, ?, ?, ?, ?, ?)
+ deleted, storage_policy_index)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
''', record)
if source:
max_rowid = max(max_rowid, rec['ROWID'])
@@ -413,7 +498,33 @@ class AccountBroker(DatabaseBroker):
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
- UPDATE incoming_sync SET sync_point=max(?, sync_point)
+ UPDATE incoming_sync
+ SET sync_point=max(?, sync_point)
WHERE remote_id=?
''', (max_rowid, source))
conn.commit()
+
+ with self.get() as conn:
+ # create the policy stat table if needed and add spi to container
+ try:
+ _really_merge_items(conn)
+ except sqlite3.OperationalError as err:
+ if 'no such column: storage_policy_index' not in str(err):
+ raise
+ self._migrate_add_storage_policy_index(conn)
+ _really_merge_items(conn)
+
+ def _migrate_add_storage_policy_index(self, conn):
+ """
+ Add the storage_policy_index column to the 'container' table and
+ set up triggers, creating the policy_stat table if needed.
+ """
+ try:
+ self.create_policy_stat_table(conn)
+ except sqlite3.OperationalError as err:
+ if 'table policy_stat already exists' not in str(err):
+ raise
+ conn.executescript('''
+ ALTER TABLE container
+ ADD COLUMN storage_policy_index INTEGER DEFAULT 0;
+ ''' + POLICY_STAT_TRIGGER_SCRIPT)
diff --git a/swift/account/server.py b/swift/account/server.py
index 919b51bbf..6427b6b7d 100644
--- a/swift/account/server.py
+++ b/swift/account/server.py
@@ -22,7 +22,7 @@ from eventlet import Timeout
import swift.common.db
from swift.account.backend import AccountBroker, DATADIR
-from swift.account.utils import account_listing_response
+from swift.account.utils import account_listing_response, get_response_headers
from swift.common.db import DatabaseConnectionError, DatabaseAlreadyExists
from swift.common.request_helpers import get_param, get_listing_content_type, \
split_and_validate_path
@@ -38,6 +38,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, \
HTTPPreconditionFailed, HTTPConflict, Request, \
HTTPInsufficientStorage, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta
+from swift.common.storage_policy import POLICY_INDEX
class AccountController(object):
@@ -108,6 +109,7 @@ class AccountController(object):
return HTTPInsufficientStorage(drive=drive, request=req)
if container: # put account container
pending_timeout = None
+ container_policy_index = req.headers.get(POLICY_INDEX, 0)
if 'x-trans-id' in req.headers:
pending_timeout = 3
broker = self._get_account_broker(drive, part, account,
@@ -125,7 +127,8 @@ class AccountController(object):
broker.put_container(container, req.headers['x-put-timestamp'],
req.headers['x-delete-timestamp'],
req.headers['x-object-count'],
- req.headers['x-bytes-used'])
+ req.headers['x-bytes-used'],
+ container_policy_index)
if req.headers['x-delete-timestamp'] > \
req.headers['x-put-timestamp']:
return HTTPNoContent(request=req)
@@ -172,16 +175,7 @@ class AccountController(object):
stale_reads_ok=True)
if broker.is_deleted():
return self._deleted_response(broker, req, HTTPNotFound)
- info = broker.get_info()
- headers = {
- 'X-Account-Container-Count': info['container_count'],
- 'X-Account-Object-Count': info['object_count'],
- 'X-Account-Bytes-Used': info['bytes_used'],
- 'X-Timestamp': info['created_at'],
- 'X-PUT-Timestamp': info['put_timestamp']}
- headers.update((key, value)
- for key, (value, timestamp) in
- broker.metadata.iteritems() if value != '')
+ headers = get_response_headers(broker)
headers['Content-Type'] = out_content_type
return HTTPNoContent(request=req, headers=headers, charset='utf-8')
diff --git a/swift/account/utils.py b/swift/account/utils.py
index 9854fae41..13dec505c 100644
--- a/swift/account/utils.py
+++ b/swift/account/utils.py
@@ -18,6 +18,7 @@ from xml.sax import saxutils
from swift.common.swob import HTTPOk, HTTPNoContent
from swift.common.utils import json, normalize_timestamp
+from swift.common.storage_policy import POLICIES
class FakeAccountBroker(object):
@@ -40,13 +41,11 @@ class FakeAccountBroker(object):
def metadata(self):
return {}
+ def get_policy_stats(self):
+ return {}
-def account_listing_response(account, req, response_content_type, broker=None,
- limit='', marker='', end_marker='', prefix='',
- delimiter=''):
- if broker is None:
- broker = FakeAccountBroker()
+def get_response_headers(broker):
info = broker.get_info()
resp_headers = {
'X-Account-Container-Count': info['container_count'],
@@ -54,9 +53,28 @@ def account_listing_response(account, req, response_content_type, broker=None,
'X-Account-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp']}
+ policy_stats = broker.get_policy_stats()
+ for policy_idx, stats in policy_stats.items():
+ policy = POLICIES.get_by_index(policy_idx)
+ if not policy:
+ continue
+ header_prefix = 'X-Account-Storage-Policy-%s-%%s' % policy.name
+ for key, value in stats.items():
+ header_name = header_prefix % key.replace('_', '-')
+ resp_headers[header_name] = value
resp_headers.update((key, value)
for key, (value, timestamp) in
broker.metadata.iteritems() if value != '')
+ return resp_headers
+
+
+def account_listing_response(account, req, response_content_type, broker=None,
+ limit='', marker='', end_marker='', prefix='',
+ delimiter=''):
+ if broker is None:
+ broker = FakeAccountBroker()
+
+ resp_headers = get_response_headers(broker)
account_list = broker.list_containers_iter(limit, marker, end_marker,
prefix, delimiter)
diff --git a/swift/container/server.py b/swift/container/server.py
index 15269f861..e028ddf9e 100644
--- a/swift/container/server.py
+++ b/swift/container/server.py
@@ -203,6 +203,7 @@ class ContainerController(object):
'x-object-count': info['object_count'],
'x-bytes-used': info['bytes_used'],
'x-trans-id': req.headers.get('x-trans-id', '-'),
+ POLICY_INDEX: info['storage_policy_index'],
'user-agent': 'container-server %s' % os.getpid(),
'referer': req.as_referer()})
if req.headers.get('x-account-override-deleted', 'no').lower() == \
diff --git a/swift/container/updater.py b/swift/container/updater.py
index fdc01e3fa..2a1316447 100644
--- a/swift/container/updater.py
+++ b/swift/container/updater.py
@@ -33,6 +33,7 @@ from swift.common.utils import get_logger, config_true_value, ismount, \
dump_recon_cache, quorum_size
from swift.common.daemon import Daemon
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
+from swift.common.storage_policy import POLICY_INDEX
class ContainerUpdater(Daemon):
@@ -221,7 +222,8 @@ class ContainerUpdater(Daemon):
part, nodes = self.get_account_ring().get_nodes(info['account'])
events = [spawn(self.container_report, node, part, container,
info['put_timestamp'], info['delete_timestamp'],
- info['object_count'], info['bytes_used'])
+ info['object_count'], info['bytes_used'],
+ info['storage_policy_index'])
for node in nodes]
successes = 0
for event in events:
@@ -254,7 +256,8 @@ class ContainerUpdater(Daemon):
self.no_changes += 1
def container_report(self, node, part, container, put_timestamp,
- delete_timestamp, count, bytes):
+ delete_timestamp, count, bytes,
+ storage_policy_index):
"""
Report container info to an account server.
@@ -265,6 +268,7 @@ class ContainerUpdater(Daemon):
:param delete_timestamp: delete timestamp
:param count: object count in the container
:param bytes: bytes used in the container
+ :param storage_policy_index: the policy index for the container
"""
with ConnectionTimeout(self.conn_timeout):
try:
@@ -274,6 +278,7 @@ class ContainerUpdater(Daemon):
'X-Object-Count': count,
'X-Bytes-Used': bytes,
'X-Account-Override-Deleted': 'yes',
+ POLICY_INDEX: storage_policy_index,
'user-agent': self.user_agent}
conn = http_connect(
node['ip'], node['port'], node['device'], part,
diff --git a/test/unit/account/test_backend.py b/test/unit/account/test_backend.py
index f75dd9a08..61a4f66c5 100644
--- a/test/unit/account/test_backend.py
+++ b/test/unit/account/test_backend.py
@@ -17,14 +17,24 @@
import hashlib
import unittest
+import pickle
+import os
from time import sleep, time
from uuid import uuid4
+from tempfile import mkdtemp
+from shutil import rmtree
+import sqlite3
+import itertools
+from contextlib import contextmanager
from swift.account.backend import AccountBroker
from swift.common.utils import normalize_timestamp
+from test.unit import patch_policies, with_tempdir
from swift.common.db import DatabaseConnectionError
+from swift.common.storage_policy import StoragePolicy, POLICIES
+@patch_policies
class TestAccountBroker(unittest.TestCase):
"""Tests for AccountBroker"""
@@ -70,16 +80,19 @@ class TestAccountBroker(unittest.TestCase):
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
self.assert_(broker.empty())
- broker.put_container('o', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('o', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
self.assert_(not broker.empty())
sleep(.00001)
- broker.put_container('o', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('o', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
self.assert_(broker.empty())
def test_reclaim(self):
broker = AccountBroker(':memory:', account='test_account')
broker.initialize(normalize_timestamp('1'))
- broker.put_container('c', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('c', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@@ -96,7 +109,8 @@ class TestAccountBroker(unittest.TestCase):
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
sleep(.00001)
- broker.put_container('c', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('c', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@@ -122,9 +136,9 @@ class TestAccountBroker(unittest.TestCase):
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
# Test reclaim after deletion. Create 3 test containers
- broker.put_container('x', 0, 0, 0, 0)
- broker.put_container('y', 0, 0, 0, 0)
- broker.put_container('z', 0, 0, 0, 0)
+ broker.put_container('x', 0, 0, 0, 0, POLICIES.default.idx)
+ broker.put_container('y', 0, 0, 0, 0, POLICIES.default.idx)
+ broker.put_container('z', 0, 0, 0, 0, POLICIES.default.idx)
broker.reclaim(normalize_timestamp(time()), time())
# self.assertEqual(len(res), 2)
# self.assert_(isinstance(res, tuple))
@@ -144,11 +158,32 @@ class TestAccountBroker(unittest.TestCase):
# self.assert_('z' in containers)
# self.assert_('a' not in containers)
+ def test_delete_db_status(self):
+ start = int(time())
+ ts = itertools.count(start)
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp(ts.next()))
+ info = broker.get_info()
+ self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
+ self.assert_(float(info['created_at']) >= start)
+ self.assertEqual(info['delete_timestamp'], '0')
+ self.assertEqual(info['status_changed_at'], '0')
+
+ # delete it
+ delete_timestamp = normalize_timestamp(ts.next())
+ broker.delete_db(delete_timestamp)
+ info = broker.get_info()
+ self.assertEqual(info['put_timestamp'], normalize_timestamp(start))
+ self.assert_(float(info['created_at']) >= start)
+ self.assertEqual(info['delete_timestamp'], delete_timestamp)
+ self.assertEqual(info['status_changed_at'], delete_timestamp)
+
def test_delete_container(self):
# Test AccountBroker.delete_container
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
- broker.put_container('o', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('o', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@@ -157,7 +192,8 @@ class TestAccountBroker(unittest.TestCase):
"SELECT count(*) FROM container "
"WHERE deleted = 1").fetchone()[0], 0)
sleep(.00001)
- broker.put_container('o', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('o', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT count(*) FROM container "
@@ -173,7 +209,8 @@ class TestAccountBroker(unittest.TestCase):
# Create initial container
timestamp = normalize_timestamp(time())
- broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -185,7 +222,8 @@ class TestAccountBroker(unittest.TestCase):
"SELECT deleted FROM container").fetchone()[0], 0)
# Reput same event
- broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -199,7 +237,8 @@ class TestAccountBroker(unittest.TestCase):
# Put new event
sleep(.00001)
timestamp = normalize_timestamp(time())
- broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -212,7 +251,8 @@ class TestAccountBroker(unittest.TestCase):
# Put old event
otimestamp = normalize_timestamp(float(timestamp) - 1)
- broker.put_container('"{<container \'&\' name>}"', otimestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', otimestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -225,7 +265,8 @@ class TestAccountBroker(unittest.TestCase):
# Put old delete event
dtimestamp = normalize_timestamp(float(timestamp) - 1)
- broker.put_container('"{<container \'&\' name>}"', 0, dtimestamp, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', 0, dtimestamp, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -242,7 +283,8 @@ class TestAccountBroker(unittest.TestCase):
# Put new delete event
sleep(.00001)
timestamp = normalize_timestamp(time())
- broker.put_container('"{<container \'&\' name>}"', 0, timestamp, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', 0, timestamp, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -256,7 +298,8 @@ class TestAccountBroker(unittest.TestCase):
# Put new event
sleep(.00001)
timestamp = normalize_timestamp(time())
- broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0)
+ broker.put_container('"{<container \'&\' name>}"', timestamp, 0, 0, 0,
+ POLICIES.default.idx)
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM container").fetchone()[0],
@@ -275,31 +318,39 @@ class TestAccountBroker(unittest.TestCase):
info = broker.get_info()
self.assertEqual(info['account'], 'test1')
self.assertEqual(info['hash'], '00000000000000000000000000000000')
+ self.assertEqual(info['put_timestamp'], normalize_timestamp(1))
+ self.assertEqual(info['delete_timestamp'], '0')
+ self.assertEqual(info['status_changed_at'], '0')
info = broker.get_info()
self.assertEqual(info['container_count'], 0)
- broker.put_container('c1', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('c1', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 1)
sleep(.00001)
- broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 2)
sleep(.00001)
- broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('c2', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 2)
sleep(.00001)
- broker.put_container('c1', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('c1', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 1)
sleep(.00001)
- broker.put_container('c2', 0, normalize_timestamp(time()), 0, 0)
+ broker.put_container('c2', 0, normalize_timestamp(time()), 0, 0,
+ POLICIES.default.idx)
info = broker.get_info()
self.assertEqual(info['container_count'], 0)
@@ -310,14 +361,17 @@ class TestAccountBroker(unittest.TestCase):
for cont1 in xrange(4):
for cont2 in xrange(125):
broker.put_container('%d-%04d' % (cont1, cont2),
- normalize_timestamp(time()), 0, 0, 0)
+ normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
for cont in xrange(125):
broker.put_container('2-0051-%04d' % cont,
- normalize_timestamp(time()), 0, 0, 0)
+ normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
for cont in xrange(125):
broker.put_container('3-%04d-0049' % cont,
- normalize_timestamp(time()), 0, 0, 0)
+ normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
listing = broker.list_containers_iter(100, '', None, None, '')
self.assertEqual(len(listing), 100)
@@ -381,7 +435,8 @@ class TestAccountBroker(unittest.TestCase):
'3-0047-', '3-0048', '3-0048-', '3-0049',
'3-0049-', '3-0050'])
- broker.put_container('3-0049-', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('3-0049-', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
listing = broker.list_containers_iter(10, '3-0048', None, None, None)
self.assertEqual(len(listing), 10)
self.assertEqual([row[0] for row in listing],
@@ -406,16 +461,26 @@ class TestAccountBroker(unittest.TestCase):
# account that has an odd container with a trailing delimiter
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
- broker.put_container('a', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-a', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-a-a', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-a-b', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('a-b', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('b', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('b-a', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('b-b', normalize_timestamp(time()), 0, 0, 0)
- broker.put_container('c', normalize_timestamp(time()), 0, 0, 0)
+ broker.put_container('a', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-a', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-a-a', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-a-b', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('a-b', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('b', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('b-a', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('b-b', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker.put_container('c', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
listing = broker.list_containers_iter(15, None, None, None, None)
self.assertEqual(len(listing), 10)
self.assertEqual([row[0] for row in listing],
@@ -437,9 +502,11 @@ class TestAccountBroker(unittest.TestCase):
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
broker.put_container('a', normalize_timestamp(1),
- normalize_timestamp(0), 0, 0)
+ normalize_timestamp(0), 0, 0,
+ POLICIES.default.idx)
broker.put_container('b', normalize_timestamp(2),
- normalize_timestamp(0), 0, 0)
+ normalize_timestamp(0), 0, 0,
+ POLICIES.default.idx)
hasha = hashlib.md5(
'%s-%s' % ('a', '0000000001.00000-0000000000.00000-0-0')
).digest()
@@ -450,7 +517,8 @@ class TestAccountBroker(unittest.TestCase):
''.join(('%02x' % (ord(a) ^ ord(b)) for a, b in zip(hasha, hashb)))
self.assertEqual(broker.get_info()['hash'], hashc)
broker.put_container('b', normalize_timestamp(3),
- normalize_timestamp(0), 0, 0)
+ normalize_timestamp(0), 0, 0,
+ POLICIES.default.idx)
hashb = hashlib.md5(
'%s-%s' % ('b', '0000000003.00000-0000000000.00000-0-0')
).digest()
@@ -463,15 +531,18 @@ class TestAccountBroker(unittest.TestCase):
broker1.initialize(normalize_timestamp('1'))
broker2 = AccountBroker(':memory:', account='a')
broker2.initialize(normalize_timestamp('1'))
- broker1.put_container('a', normalize_timestamp(1), 0, 0, 0)
- broker1.put_container('b', normalize_timestamp(2), 0, 0, 0)
+ broker1.put_container('a', normalize_timestamp(1), 0, 0, 0,
+ POLICIES.default.idx)
+ broker1.put_container('b', normalize_timestamp(2), 0, 0, 0,
+ POLICIES.default.idx)
id = broker1.get_info()['id']
broker2.merge_items(broker1.get_items_since(
broker2.get_sync(id), 1000), id)
items = broker2.get_items_since(-1, 1000)
self.assertEqual(len(items), 2)
self.assertEqual(['a', 'b'], sorted([rec['name'] for rec in items]))
- broker1.put_container('c', normalize_timestamp(3), 0, 0, 0)
+ broker1.put_container('c', normalize_timestamp(3), 0, 0, 0,
+ POLICIES.default.idx)
broker2.merge_items(broker1.get_items_since(
broker2.get_sync(id), 1000), id)
items = broker2.get_items_since(-1, 1000)
@@ -479,6 +550,145 @@ class TestAccountBroker(unittest.TestCase):
self.assertEqual(['a', 'b', 'c'],
sorted([rec['name'] for rec in items]))
+ def test_load_old_pending_puts(self):
+ # pending puts from pre-storage-policy account brokers won't contain
+ # the storage policy index
+ tempdir = mkdtemp()
+ broker_path = os.path.join(tempdir, 'test-load-old.db')
+ try:
+ broker = AccountBroker(broker_path, account='real')
+ broker.initialize(normalize_timestamp(1))
+ with open(broker_path + '.pending', 'a+b') as pending:
+ pending.write(':')
+ pending.write(pickle.dumps(
+ # name, put_timestamp, delete_timestamp, object_count,
+ # bytes_used, deleted
+ ('oldcon', normalize_timestamp(200),
+ normalize_timestamp(0),
+ 896, 9216695, 0)).encode('base64'))
+
+ broker._commit_puts()
+ with broker.get() as conn:
+ results = list(conn.execute('''
+ SELECT name, storage_policy_index FROM container
+ '''))
+ self.assertEqual(len(results), 1)
+ self.assertEqual(dict(results[0]),
+ {'name': 'oldcon', 'storage_policy_index': 0})
+ finally:
+ rmtree(tempdir)
+
+ @patch_policies([StoragePolicy(0, 'zero', False),
+ StoragePolicy(1, 'one', True),
+ StoragePolicy(2, 'two', False),
+ StoragePolicy(3, 'three', False)])
+ def test_get_policy_stats(self):
+ ts = itertools.count()
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp(ts.next()))
+ # check empty policy_stats
+ self.assertTrue(broker.empty())
+ policy_stats = broker.get_policy_stats()
+ self.assertEqual(policy_stats, {})
+
+ # add some empty containers
+ for policy in POLICIES:
+ container_name = 'c-%s' % policy.name
+ put_timestamp = normalize_timestamp(ts.next())
+ broker.put_container(container_name,
+ put_timestamp, 0,
+ 0, 0,
+ policy.idx)
+
+ policy_stats = broker.get_policy_stats()
+ stats = policy_stats[policy.idx]
+ self.assertEqual(stats['object_count'], 0)
+ self.assertEqual(stats['bytes_used'], 0)
+
+ # update the containers object & byte count
+ for policy in POLICIES:
+ container_name = 'c-%s' % policy.name
+ put_timestamp = normalize_timestamp(ts.next())
+ count = policy.idx * 100 # good as any integer
+ broker.put_container(container_name,
+ put_timestamp, 0,
+ count, count,
+ policy.idx)
+
+ policy_stats = broker.get_policy_stats()
+ stats = policy_stats[policy.idx]
+ self.assertEqual(stats['object_count'], count)
+ self.assertEqual(stats['bytes_used'], count)
+
+ # check all the policy_stats at once
+ for policy_index, stats in policy_stats.items():
+ policy = POLICIES[policy_index]
+ count = policy.idx * 100 # coupled with policy for test
+ self.assertEqual(stats['object_count'], count)
+ self.assertEqual(stats['bytes_used'], count)
+
+ # now delete the containers one by one
+ for policy in POLICIES:
+ container_name = 'c-%s' % policy.name
+ delete_timestamp = normalize_timestamp(ts.next())
+ broker.put_container(container_name,
+ 0, delete_timestamp,
+ 0, 0,
+ policy.idx)
+
+ policy_stats = broker.get_policy_stats()
+ stats = policy_stats[policy.idx]
+ self.assertEqual(stats['object_count'], 0)
+ self.assertEqual(stats['bytes_used'], 0)
+
+ @patch_policies([StoragePolicy(0, 'zero', False),
+ StoragePolicy(1, 'one', True)])
+ def test_policy_stats_tracking(self):
+ ts = itertools.count()
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp(ts.next()))
+
+ # policy 0
+ broker.put_container('con1', ts.next(), 0, 12, 2798641, 0)
+ broker.put_container('con1', ts.next(), 0, 13, 8156441, 0)
+ # policy 1
+ broker.put_container('con2', ts.next(), 0, 7, 5751991, 1)
+ broker.put_container('con2', ts.next(), 0, 8, 6085379, 1)
+
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 2)
+ self.assertEqual(stats[0]['object_count'], 13)
+ self.assertEqual(stats[0]['bytes_used'], 8156441)
+ self.assertEqual(stats[1]['object_count'], 8)
+ self.assertEqual(stats[1]['bytes_used'], 6085379)
+
+ # Break encapsulation here to make sure that there's only 2 rows in
+ # the stats table. It's possible that there could be 4 rows (one per
+ # put_container) but that they came out in the right order so that
+ # get_policy_stats() collapsed them down to the right number. To prove
+ # that's not so, we have to go peek at the broker's internals.
+ with broker.get() as conn:
+ nrows = conn.execute(
+ "SELECT COUNT(*) FROM policy_stat").fetchall()[0][0]
+ self.assertEqual(nrows, 2)
+
+
+def prespi_AccountBroker_initialize(self, conn, put_timestamp, **kwargs):
+ """
+ The AccountBroker initialze() function before we added the
+ policy stat table. Used by test_policy_table_creation() to
+ make sure that the AccountBroker will correctly add the table
+ for cases where the DB existed before the policy suport was added.
+
+ :param conn: DB connection object
+ :param put_timestamp: put timestamp
+ """
+ if not self.account:
+ raise ValueError(
+ 'Attempting to create a new database with no account set')
+ self.create_container_table(conn)
+ self.create_account_stat_table(conn, put_timestamp)
+
def premetadata_create_account_stat_table(self, conn, put_timestamp):
"""
@@ -543,3 +753,274 @@ class TestAccountBrokerBeforeMetadata(TestAccountBroker):
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
conn.execute('SELECT metadata FROM account_stat')
+
+
+def prespi_create_container_table(self, conn):
+ """
+ Copied from AccountBroker before the sstoage_policy_index column was
+ added; used for testing with TestAccountBrokerBeforeSPI.
+
+ Create container table which is specific to the account DB.
+
+ :param conn: DB connection object
+ """
+ conn.executescript("""
+ CREATE TABLE container (
+ ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT,
+ put_timestamp TEXT,
+ delete_timestamp TEXT,
+ object_count INTEGER,
+ bytes_used INTEGER,
+ deleted INTEGER DEFAULT 0
+ );
+
+ CREATE INDEX ix_container_deleted_name ON
+ container (deleted, name);
+
+ CREATE TRIGGER container_insert AFTER INSERT ON container
+ BEGIN
+ UPDATE account_stat
+ SET container_count = container_count + (1 - new.deleted),
+ object_count = object_count + new.object_count,
+ bytes_used = bytes_used + new.bytes_used,
+ hash = chexor(hash, new.name,
+ new.put_timestamp || '-' ||
+ new.delete_timestamp || '-' ||
+ new.object_count || '-' || new.bytes_used);
+ END;
+
+ CREATE TRIGGER container_update BEFORE UPDATE ON container
+ BEGIN
+ SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
+ END;
+
+
+ CREATE TRIGGER container_delete AFTER DELETE ON container
+ BEGIN
+ UPDATE account_stat
+ SET container_count = container_count - (1 - old.deleted),
+ object_count = object_count - old.object_count,
+ bytes_used = bytes_used - old.bytes_used,
+ hash = chexor(hash, old.name,
+ old.put_timestamp || '-' ||
+ old.delete_timestamp || '-' ||
+ old.object_count || '-' || old.bytes_used);
+ END;
+ """)
+
+
+class TestAccountBrokerBeforeSPI(TestAccountBroker):
+ """
+ Tests for AccountBroker against databases created before
+ the storage_policy_index column was added.
+ """
+
+ def setUp(self):
+ self._imported_create_container_table = \
+ AccountBroker.create_container_table
+ AccountBroker.create_container_table = \
+ prespi_create_container_table
+ self._imported_initialize = AccountBroker._initialize
+ AccountBroker._initialize = prespi_AccountBroker_initialize
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp('1'))
+ exc = None
+ with broker.get() as conn:
+ try:
+ conn.execute('SELECT storage_policy_index FROM container')
+ except BaseException as err:
+ exc = err
+ self.assert_('no such column: storage_policy_index' in str(exc))
+ with broker.get() as conn:
+ try:
+ conn.execute('SELECT * FROM policy_stat')
+ except sqlite3.OperationalError as err:
+ self.assert_('no such table: policy_stat' in str(err))
+ else:
+ self.fail('database created with policy_stat table')
+
+ def tearDown(self):
+ AccountBroker.create_container_table = \
+ self._imported_create_container_table
+ AccountBroker._initialize = self._imported_initialize
+ broker = AccountBroker(':memory:', account='a')
+ broker.initialize(normalize_timestamp('1'))
+ with broker.get() as conn:
+ conn.execute('SELECT storage_policy_index FROM container')
+
+ @with_tempdir
+ def test_policy_table_migration(self, tempdir):
+ db_path = os.path.join(tempdir, 'account.db')
+
+ # first init an acct DB without the policy_stat table present
+ broker = AccountBroker(db_path, account='a')
+ broker.initialize(normalize_timestamp('1'))
+ with broker.get() as conn:
+ try:
+ conn.execute('''
+ SELECT * FROM policy_stat
+ ''').fetchone()[0]
+ except sqlite3.OperationalError as err:
+ # confirm that the table really isn't there
+ self.assert_('no such table: policy_stat' in str(err))
+ else:
+ self.fail('broker did not raise sqlite3.OperationalError '
+ 'trying to select from policy_stat table!')
+
+ # make sure we can HEAD this thing w/o the table
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 0)
+
+ # now do a PUT to create the table
+ broker.put_container('o', normalize_timestamp(time()), 0, 0, 0,
+ POLICIES.default.idx)
+ broker._commit_puts_stale_ok()
+
+ # now confirm that the table was created
+ with broker.get() as conn:
+ conn.execute('SELECT * FROM policy_stat')
+
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 1)
+
+ @patch_policies
+ @with_tempdir
+ def test_container_table_migration(self, tempdir):
+ db_path = os.path.join(tempdir, 'account.db')
+
+ # first init an acct DB without the policy_stat table present
+ broker = AccountBroker(db_path, account='a')
+ broker.initialize(normalize_timestamp('1'))
+ with broker.get() as conn:
+ try:
+ conn.execute('''
+ SELECT storage_policy_index FROM container
+ ''').fetchone()[0]
+ except sqlite3.OperationalError as err:
+ # confirm that the table doesn't have this column
+ self.assert_('no such column: storage_policy_index' in
+ str(err))
+ else:
+ self.fail('broker did not raise sqlite3.OperationalError '
+ 'trying to select from storage_policy_index '
+ 'from container table!')
+
+ # manually insert an existing row to avoid migration
+ with broker.get() as conn:
+ conn.execute('''
+ INSERT INTO container (name, put_timestamp,
+ delete_timestamp, object_count, bytes_used,
+ deleted)
+ VALUES (?, ?, ?, ?, ?, ?)
+ ''', ('test_name', normalize_timestamp(time()), 0, 1, 2, 0))
+ conn.commit()
+
+ # make sure we can iter containers without the migration
+ for c in broker.list_containers_iter(1, None, None, None, None):
+ self.assertEqual(c, ('test_name', 1, 2, 0))
+
+ # stats table is mysteriously empty...
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 0)
+
+ # now do a PUT with a different value for storage_policy_index
+ # which will update the DB schema as well as update policy_stats
+ # for legacy containers in the DB (those without an SPI)
+ other_policy = [p for p in POLICIES if p.idx != 0][0]
+ broker.put_container('test_second', normalize_timestamp(time()),
+ 0, 3, 4, other_policy.idx)
+ broker._commit_puts_stale_ok()
+
+ with broker.get() as conn:
+ rows = conn.execute('''
+ SELECT name, storage_policy_index FROM container
+ ''').fetchall()
+ for row in rows:
+ if row[0] == 'test_name':
+ self.assertEqual(row[1], 0)
+ else:
+ self.assertEqual(row[1], other_policy.idx)
+
+ # we should have stats for both containers
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 2)
+ self.assertEqual(stats[0]['object_count'], 1)
+ self.assertEqual(stats[0]['bytes_used'], 2)
+ self.assertEqual(stats[1]['object_count'], 3)
+ self.assertEqual(stats[1]['bytes_used'], 4)
+
+ # now lets delete a container and make sure policy_stats is OK
+ with broker.get() as conn:
+ conn.execute('''
+ DELETE FROM container WHERE name = ?
+ ''', ('test_name',))
+ conn.commit()
+ stats = broker.get_policy_stats()
+ self.assertEqual(len(stats), 2)
+ self.assertEqual(stats[0]['object_count'], 0)
+ self.assertEqual(stats[0]['bytes_used'], 0)
+ self.assertEqual(stats[1]['object_count'], 3)
+ self.assertEqual(stats[1]['bytes_used'], 4)
+
+ @with_tempdir
+ def test_half_upgraded_database(self, tempdir):
+ db_path = os.path.join(tempdir, 'account.db')
+ ts = itertools.count()
+
+ broker = AccountBroker(db_path, account='a')
+ broker.initialize(normalize_timestamp(ts.next()))
+
+ self.assertTrue(broker.empty())
+
+ # add a container (to pending file)
+ broker.put_container('c', normalize_timestamp(ts.next()), 0, 0, 0,
+ POLICIES.default.idx)
+
+ real_get = broker.get
+ called = []
+
+ @contextmanager
+ def mock_get():
+ with real_get() as conn:
+
+ def mock_executescript(script):
+ if called:
+ raise Exception('kaboom!')
+ called.append(script)
+
+ conn.executescript = mock_executescript
+ yield conn
+
+ broker.get = mock_get
+
+ try:
+ broker._commit_puts()
+ except Exception:
+ pass
+ else:
+ self.fail('mock exception was not raised')
+
+ self.assertEqual(len(called), 1)
+ self.assert_('CREATE TABLE policy_stat' in called[0])
+
+ # nothing was commited
+ broker = AccountBroker(db_path, account='a')
+ with broker.get() as conn:
+ try:
+ conn.execute('SELECT * FROM policy_stat')
+ except sqlite3.OperationalError as err:
+ self.assert_('no such table: policy_stat' in str(err))
+ else:
+ self.fail('half upgraded database!')
+ container_count = conn.execute(
+ 'SELECT count(*) FROM container').fetchone()[0]
+ self.assertEqual(container_count, 0)
+
+ # try again to commit puts
+ self.assertFalse(broker.empty())
+
+ # full migration successful
+ with broker.get() as conn:
+ conn.execute('SELECT * FROM policy_stat')
+ conn.execute('SELECT storage_policy_index FROM container')
diff --git a/test/unit/account/test_replicator.py b/test/unit/account/test_replicator.py
index 0bba02325..43e3a4d72 100644
--- a/test/unit/account/test_replicator.py
+++ b/test/unit/account/test_replicator.py
@@ -13,18 +13,132 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
+import time
import unittest
+import shutil
+from swift.account import replicator, backend, server
+from swift.common.utils import normalize_timestamp
+from swift.common.storage_policy import POLICIES
-class TestReplicator(unittest.TestCase):
- """
- swift.account.replicator is currently just a subclass with some class
- variables overridden, but at least this test stub will ensure proper Python
- syntax.
- """
+from test.unit.common import test_db_replicator
- def test_placeholder(self):
- pass
+
+class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
+
+ backend = backend.AccountBroker
+ datadir = server.DATADIR
+ replicator_daemon = replicator.AccountReplicator
+
+ def test_sync(self):
+ broker = self._get_broker('a', node_index=0)
+ put_timestamp = normalize_timestamp(time.time())
+ broker.initialize(put_timestamp)
+ # "replicate" to same database
+ daemon = replicator.AccountReplicator({})
+ part, node = self._get_broker_part_node(broker)
+ info = broker.get_replication_info()
+ success = daemon._repl_to_node(node, broker, part, info)
+ # nothing to do
+ self.assertTrue(success)
+ self.assertEqual(1, daemon.stats['no_change'])
+
+ def test_sync_remote_missing(self):
+ broker = self._get_broker('a', node_index=0)
+ put_timestamp = time.time()
+ broker.initialize(put_timestamp)
+ # "replicate" to all other nodes
+ part, node = self._get_broker_part_node(broker)
+ daemon = self._run_once(node)
+ # complete rsync
+ self.assertEqual(2, daemon.stats['rsync'])
+ local_info = self._get_broker(
+ 'a', node_index=0).get_info()
+ for i in range(1, 3):
+ remote_broker = self._get_broker('a', node_index=i)
+ self.assertTrue(os.path.exists(remote_broker.db_file))
+ remote_info = remote_broker.get_info()
+ for k, v in local_info.items():
+ if k == 'id':
+ continue
+ self.assertEqual(remote_info[k], v,
+ "mismatch remote %s %r != %r" % (
+ k, remote_info[k], v))
+
+ def test_sync_remote_missing_most_rows(self):
+ put_timestamp = time.time()
+ # create "local" broker
+ broker = self._get_broker('a', node_index=0)
+ broker.initialize(put_timestamp)
+ # create "remote" broker
+ remote_broker = self._get_broker('a', node_index=1)
+ remote_broker.initialize(put_timestamp)
+ # add a row to "local" db
+ broker.put_container('/a/c', time.time(), 0, 0, 0,
+ POLICIES.default.idx)
+ #replicate
+ daemon = replicator.AccountReplicator({})
+
+ def _rsync_file(db_file, remote_file, **kwargs):
+ remote_server, remote_path = remote_file.split('/', 1)
+ dest_path = os.path.join(self.root, remote_path)
+ shutil.copy(db_file, dest_path)
+ return True
+ daemon._rsync_file = _rsync_file
+ part, node = self._get_broker_part_node(remote_broker)
+ info = broker.get_replication_info()
+ success = daemon._repl_to_node(node, broker, part, info)
+ self.assertTrue(success)
+ # row merge
+ self.assertEqual(1, daemon.stats['remote_merge'])
+ local_info = self._get_broker(
+ 'a', node_index=0).get_info()
+ remote_info = self._get_broker(
+ 'a', node_index=1).get_info()
+ for k, v in local_info.items():
+ if k == 'id':
+ continue
+ self.assertEqual(remote_info[k], v,
+ "mismatch remote %s %r != %r" % (
+ k, remote_info[k], v))
+
+ def test_sync_remote_missing_one_rows(self):
+ put_timestamp = time.time()
+ # create "local" broker
+ broker = self._get_broker('a', node_index=0)
+ broker.initialize(put_timestamp)
+ # create "remote" broker
+ remote_broker = self._get_broker('a', node_index=1)
+ remote_broker.initialize(put_timestamp)
+ # add some rows to both db
+ for i in range(10):
+ put_timestamp = time.time()
+ for db in (broker, remote_broker):
+ path = '/a/c_%s' % i
+ db.put_container(path, put_timestamp, 0, 0, 0,
+ POLICIES.default.idx)
+ # now a row to the "local" broker only
+ broker.put_container('/a/c_missing', time.time(), 0, 0, 0,
+ POLICIES.default.idx)
+ # replicate
+ daemon = replicator.AccountReplicator({})
+ part, node = self._get_broker_part_node(remote_broker)
+ info = broker.get_replication_info()
+ success = daemon._repl_to_node(node, broker, part, info)
+ self.assertTrue(success)
+ # row merge
+ self.assertEqual(1, daemon.stats['diff'])
+ local_info = self._get_broker(
+ 'a', node_index=0).get_info()
+ remote_info = self._get_broker(
+ 'a', node_index=1).get_info()
+ for k, v in local_info.items():
+ if k == 'id':
+ continue
+ self.assertEqual(remote_info[k], v,
+ "mismatch remote %s %r != %r" % (
+ k, remote_info[k], v))
if __name__ == '__main__':
diff --git a/test/unit/account/test_server.py b/test/unit/account/test_server.py
index 9e6cc0d7d..0eca82899 100644
--- a/test/unit/account/test_server.py
+++ b/test/unit/account/test_server.py
@@ -22,6 +22,8 @@ from shutil import rmtree
from StringIO import StringIO
from time import gmtime
from test.unit import FakeLogger
+import itertools
+import random
import simplejson
import xml.dom.minidom
@@ -31,8 +33,11 @@ from swift.common import constraints
from swift.account.server import AccountController
from swift.common.utils import normalize_timestamp, replication, public
from swift.common.request_helpers import get_sys_meta_prefix
+from test.unit import patch_policies
+from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
+@patch_policies
class TestAccountController(unittest.TestCase):
"""Test swift.account.server.AccountController"""
def setUp(self):
@@ -1670,6 +1675,182 @@ class TestAccountController(unittest.TestCase):
[(('1.2.3.4 - - [01/Jan/1970:02:46:41 +0000] "HEAD /sda1/p/a" 404 '
'- "-" "-" "-" 2.0000 "-"',), {})])
+ def test_policy_stats_with_legacy(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ # add a container
+ req = Request.blank('/sda1/p/a/c1', method='PUT', headers={
+ 'X-Put-Timestamp': normalize_timestamp(ts.next()),
+ 'X-Delete-Timestamp': '0',
+ 'X-Object-Count': '2',
+ 'X-Bytes-Used': '4',
+ })
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201)
+
+ # read back rollup
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ self.assertEquals(resp.headers['X-Account-Object-Count'], '2')
+ self.assertEquals(resp.headers['X-Account-Bytes-Used'], '4')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Object-Count' %
+ POLICIES[0].name], '2')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
+ POLICIES[0].name], '4')
+
+ def test_policy_stats_non_default(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ # add a container
+ non_default_policies = [p for p in POLICIES if not p.is_default]
+ policy = random.choice(non_default_policies)
+ req = Request.blank('/sda1/p/a/c1', method='PUT', headers={
+ 'X-Put-Timestamp': normalize_timestamp(ts.next()),
+ 'X-Delete-Timestamp': '0',
+ 'X-Object-Count': '2',
+ 'X-Bytes-Used': '4',
+ POLICY_INDEX: policy.idx,
+ })
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201)
+
+ # read back rollup
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ self.assertEquals(resp.headers['X-Account-Object-Count'], '2')
+ self.assertEquals(resp.headers['X-Account-Bytes-Used'], '4')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Object-Count' %
+ policy.name], '2')
+ self.assertEquals(
+ resp.headers['X-Account-Storage-Policy-%s-Bytes-Used' %
+ policy.name], '4')
+
+ def test_empty_policy_stats(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ for key in resp.headers:
+ self.assert_('storage-policy' not in key.lower())
+
+ def test_empty_except_for_used_policies(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ # starts empty
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ for key in resp.headers:
+ self.assert_('storage-policy' not in key.lower())
+
+ # add a container
+ policy = random.choice(POLICIES)
+ req = Request.blank('/sda1/p/a/c1', method='PUT', headers={
+ 'X-Put-Timestamp': normalize_timestamp(ts.next()),
+ 'X-Delete-Timestamp': '0',
+ 'X-Object-Count': '2',
+ 'X-Bytes-Used': '4',
+ POLICY_INDEX: policy.idx,
+ })
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201)
+
+ # only policy of the created container should be in headers
+ for method in ('GET', 'HEAD'):
+ req = Request.blank('/sda1/p/a', method=method)
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+ for key in resp.headers:
+ if 'storage-policy' in key.lower():
+ self.assert_(policy.name.lower() in key.lower())
+
+ def test_multiple_policies_in_use(self):
+ ts = itertools.count()
+ # create the account
+ req = Request.blank('/sda1/p/a', method='PUT', headers={
+ 'X-Timestamp': normalize_timestamp(ts.next())})
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201) # sanity
+
+ # add some containers
+ for policy in POLICIES:
+ count = policy.idx * 100 # good as any integer
+ container_path = '/sda1/p/a/c_%s' % policy.name
+ req = Request.blank(
+ container_path, method='PUT', headers={
+ 'X-Put-Timestamp': normalize_timestamp(ts.next()),
+ 'X-Delete-Timestamp': '0',
+ 'X-Object-Count': count,
+ 'X-Bytes-Used': count,
+ POLICY_INDEX: policy.idx,
+ })
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int, 201)
+
+ req = Request.blank('/sda1/p/a', method='HEAD')
+ resp = req.get_response(self.controller)
+ self.assertEqual(resp.status_int // 100, 2)
+
+ # check container counts in roll up headers
+ total_object_count = 0
+ total_bytes_used = 0
+ for key in resp.headers:
+ if 'storage-policy' not in key.lower():
+ continue
+ for policy in POLICIES:
+ if policy.name.lower() not in key.lower():
+ continue
+ if key.lower().endswith('object-count'):
+ object_count = int(resp.headers[key])
+ self.assertEqual(policy.idx * 100, object_count)
+ total_object_count += object_count
+ if key.lower().endswith('bytes-used'):
+ bytes_used = int(resp.headers[key])
+ self.assertEqual(policy.idx * 100, bytes_used)
+ total_bytes_used += bytes_used
+
+ expected_total_count = sum([p.idx * 100 for p in POLICIES])
+ self.assertEqual(expected_total_count, total_object_count)
+ self.assertEqual(expected_total_count, total_bytes_used)
+
+
+@patch_policies([StoragePolicy(0, 'zero', False),
+ StoragePolicy(1, 'one', True),
+ StoragePolicy(2, 'two', False),
+ StoragePolicy(3, 'three', False)])
+class TestNonLegacyDefaultStoragePolicy(TestAccountController):
+
+ pass
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py
index d9e77d353..182319630 100644
--- a/test/unit/container/test_server.py
+++ b/test/unit/container/test_server.py
@@ -2268,6 +2268,7 @@ class TestContainerController(unittest.TestCase):
'x-delete-timestamp': '0',
'x-object-count': 0,
'x-put-timestamp': '0000012345.00000',
+ POLICY_INDEX: '%s' % POLICIES.default.idx,
'referer': 'PUT http://localhost/sda1/p/a/c',
'user-agent': 'container-server %d' % os.getpid(),
'x-trans-id': '-'})})
@@ -2285,6 +2286,7 @@ class TestContainerController(unittest.TestCase):
'x-delete-timestamp': '0',
'x-object-count': 0,
'x-put-timestamp': '0000012345.00000',
+ POLICY_INDEX: '%s' % POLICIES.default.idx,
'referer': 'PUT http://localhost/sda1/p/a/c',
'user-agent': 'container-server %d' % os.getpid(),
'x-trans-id': '-'})})