summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Lange <greglange@gmail.com>2011-04-11 23:13:01 +0000
committerTarmac <>2011-04-11 23:13:01 +0000
commitbac7d7380abee217877449c351654835c0b85362 (patch)
tree91945e3b751e0d760d6cd8c26f31141304e338f2
parentbc9ec20033302d0ae63e20127403f8d97285654c (diff)
parentb81dc66a225fef4edf8cd0936cbe763cf74ed4c5 (diff)
downloadswift-bac7d7380abee217877449c351654835c0b85362.tar.gz
Refactored the log processing daemon to make it more testable.
Added tests for that. I shouldn't have changed how it worked at all. This needs to be tested on staging extensively before being pushed to production.
-rw-r--r--swift/stats/log_processor.py290
-rw-r--r--test/unit/stats/test_log_processor.py413
2 files changed, 634 insertions, 69 deletions
diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py
index 29f8a7524..f2cfd220c 100644
--- a/swift/stats/log_processor.py
+++ b/swift/stats/log_processor.py
@@ -30,6 +30,8 @@ from swift.common.exceptions import ChunkReadTimeout
from swift.common.utils import get_logger, readconf
from swift.common.daemon import Daemon
+now = datetime.datetime.now
+
class BadFileDownload(Exception):
def __init__(self, status_code=None):
@@ -234,31 +236,46 @@ class LogProcessorDaemon(Daemon):
self.log_processor_container = c.get('container_name',
'log_processing_data')
self.worker_count = int(c.get('worker_count', '1'))
+ self._keylist_mapping = None
+ self.processed_files_filename = 'processed_files.pickle.gz'
- def run_once(self, *args, **kwargs):
- for k in 'lookback_hours lookback_window'.split():
- if kwargs[k] is not None:
- setattr(self, k, kwargs[k])
+ def get_lookback_interval(self):
+ """
+ :returns: lookback_start, lookback_end.
+
+ Both or just lookback_end can be None. Otherwise, returns strings
+ of the form 'YYYYMMDDHH'. The interval returned is used as bounds
+ when looking for logs to processes.
+
+ A returned None means don't limit the log files examined on that
+ side of the interval.
+ """
- self.logger.info(_("Beginning log processing"))
- start = time.time()
if self.lookback_hours == 0:
lookback_start = None
lookback_end = None
else:
delta_hours = datetime.timedelta(hours=self.lookback_hours)
- lookback_start = datetime.datetime.now() - delta_hours
+ lookback_start = now() - delta_hours
lookback_start = lookback_start.strftime('%Y%m%d%H')
if self.lookback_window == 0:
lookback_end = None
else:
delta_window = datetime.timedelta(hours=self.lookback_window)
- lookback_end = datetime.datetime.now() - \
+ lookback_end = now() - \
delta_hours + \
delta_window
lookback_end = lookback_end.strftime('%Y%m%d%H')
- self.logger.debug('lookback_start: %s' % lookback_start)
- self.logger.debug('lookback_end: %s' % lookback_end)
+ return lookback_start, lookback_end
+
+ def get_processed_files_list(self):
+ """
+ :returns: a set of files that have already been processed or returns
+ None on error.
+
+ Downloads the set from the stats account. Creates an empty set if
+ the an existing file cannot be found.
+ """
try:
# Note: this file (or data set) will grow without bound.
# In practice, if it becomes a problem (say, after many months of
@@ -266,44 +283,52 @@ class LogProcessorDaemon(Daemon):
# entries. Automatically pruning on each run could be dangerous.
# There is not a good way to determine when an old entry should be
# pruned (lookback_hours could be set to anything and could change)
- processed_files_stream = self.log_processor.get_object_data(
- self.log_processor_account,
- self.log_processor_container,
- 'processed_files.pickle.gz',
- compressed=True)
- buf = '\n'.join(x for x in processed_files_stream)
+ stream = self.log_processor.get_object_data(
+ self.log_processor_account,
+ self.log_processor_container,
+ self.processed_files_filename,
+ compressed=True)
+ buf = '\n'.join(x for x in stream)
if buf:
- already_processed_files = cPickle.loads(buf)
+ files = cPickle.loads(buf)
else:
- already_processed_files = set()
+ return None
except BadFileDownload, err:
if err.status_code == 404:
- already_processed_files = set()
+ files = set()
else:
- self.logger.error(_('Log processing unable to load list of '
- 'already processed log files'))
- return
- self.logger.debug(_('found %d processed files') % \
- len(already_processed_files))
- logs_to_process = self.log_processor.get_data_list(lookback_start,
- lookback_end,
- already_processed_files)
- self.logger.info(_('loaded %d files to process') %
- len(logs_to_process))
- if not logs_to_process:
- self.logger.info(_("Log processing done (%0.2f minutes)") %
- ((time.time() - start) / 60))
- return
+ return None
+ return files
- # map
- processor_args = (self.total_conf, self.logger)
- results = multiprocess_collate(processor_args, logs_to_process,
- self.worker_count)
+ def get_aggregate_data(self, processed_files, input_data):
+ """
+ Aggregates stats data by account/hour, summing as needed.
+
+ :param processed_files: set of processed files
+ :param input_data: is the output from multiprocess_collate/the plugins.
+
+ :returns: A dict containing data aggregated from the input_data
+ passed in.
+
+ The dict returned has tuple keys of the form:
+ (account, year, month, day, hour)
+ The dict returned has values that are dicts with items of this
+ form:
+ key:field_value
+ - key corresponds to something in one of the plugin's keylist
+ mapping, something like the tuple (source, level, verb, code)
+ - field_value is the sum of the field_values for the
+ corresponding values in the input
+
+ Both input_data and the dict returned are hourly aggregations of
+ stats.
+
+ Multiple values for the same (account, hour, tuple key) found in
+ input_data are summed in the dict returned.
+ """
- #reduce
aggr_data = {}
- processed_files = already_processed_files
- for item, data in results:
+ for item, data in input_data:
# since item contains the plugin and the log name, new plugins will
# "reprocess" the file and the results will be in the final csv.
processed_files.add(item)
@@ -315,14 +340,30 @@ class LogProcessorDaemon(Daemon):
# processing plugins need to realize this
existing_data[i] = current + j
aggr_data[k] = existing_data
+ return aggr_data
+
+ def get_final_info(self, aggr_data):
+ """
+ Aggregates data from aggr_data based on the keylist mapping.
+
+ :param aggr_data: The results of the get_aggregate_data function.
+ :returns: a dict of further aggregated data
+
+ The dict returned has keys of the form:
+ (account, year, month, day, hour)
+ The dict returned has values that are dicts with items of this
+ form:
+ 'field_name': field_value (int)
+
+ Data is aggregated as specified by the keylist mapping. The
+ keylist mapping specifies which keys to combine in aggr_data
+ and the final field_names for these combined keys in the dict
+ returned. Fields combined are summed.
+ """
- # group
- # reduce a large number of keys in aggr_data[k] to a small number of
- # output keys
- keylist_mapping = self.log_processor.generate_keylist_mapping()
final_info = collections.defaultdict(dict)
for account, data in aggr_data.items():
- for key, mapping in keylist_mapping.items():
+ for key, mapping in self.keylist_mapping.items():
if isinstance(mapping, (list, set)):
value = 0
for k in mapping:
@@ -336,37 +377,154 @@ class LogProcessorDaemon(Daemon):
except KeyError:
value = 0
final_info[account][key] = value
+ return final_info
- # output
- sorted_keylist_mapping = sorted(keylist_mapping)
- columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping)
- out_buf = [columns]
+ def store_processed_files_list(self, processed_files):
+ """
+ Stores the proccessed files list in the stats account.
+
+ :param processed_files: set of processed files
+ """
+
+ s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
+ f = cStringIO.StringIO(s)
+ self.log_processor.internal_proxy.upload_file(f,
+ self.log_processor_account,
+ self.log_processor_container,
+ self.processed_files_filename)
+
+ def get_output(self, final_info):
+ """
+ :returns: a list of rows to appear in the csv file.
+
+ The first row contains the column headers for the rest of the
+ rows in the returned list.
+
+ Each row after the first row corresponds to an account's data
+ for that hour.
+ """
+
+ sorted_keylist_mapping = sorted(self.keylist_mapping)
+ columns = ['data_ts', 'account'] + sorted_keylist_mapping
+ output = [columns]
for (account, year, month, day, hour), d in final_info.items():
- data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
- row = [data_ts]
- row.append('%s' % account)
+ data_ts = '%04d/%02d/%02d %02d:00:00' % \
+ (int(year), int(month), int(day), int(hour))
+ row = [data_ts, '%s' % (account)]
for k in sorted_keylist_mapping:
- row.append('%s' % d[k])
- out_buf.append(','.join(row))
- out_buf = '\n'.join(out_buf)
+ row.append(str(d[k]))
+ output.append(row)
+ return output
+
+ def store_output(self, output):
+ """
+ Takes the a list of rows and stores a csv file of the values in the
+ stats account.
+
+ :param output: list of rows to appear in the csv file
+
+ This csv file is final product of this script.
+ """
+
+ out_buf = '\n'.join([','.join(row) for row in output])
h = hashlib.md5(out_buf).hexdigest()
upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
f = cStringIO.StringIO(out_buf)
self.log_processor.internal_proxy.upload_file(f,
- self.log_processor_account,
- self.log_processor_container,
- upload_name)
+ self.log_processor_account,
+ self.log_processor_container,
+ upload_name)
- # cleanup
- s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
- f = cStringIO.StringIO(s)
- self.log_processor.internal_proxy.upload_file(f,
- self.log_processor_account,
- self.log_processor_container,
- 'processed_files.pickle.gz')
+ @property
+ def keylist_mapping(self):
+ """
+ :returns: the keylist mapping.
+
+ The keylist mapping determines how the stats fields are aggregated in
+ the final aggregation step.
+ """
+
+ if self._keylist_mapping == None:
+ self._keylist_mapping = \
+ self.log_processor.generate_keylist_mapping()
+ return self._keylist_mapping
+
+ def process_logs(self, logs_to_process, processed_files):
+ """
+ :param logs_to_process: list of logs to process
+ :param processed_files: set of processed files
+
+ :returns: returns a list of rows of processed data.
+
+ The first row is the column headers. The rest of the rows contain
+ hourly aggregate data for the account specified in the row.
+
+ Files processed are added to the processed_files set.
+
+ When a large data structure is no longer needed, it is deleted in
+ an effort to conserve memory.
+ """
+
+ # map
+ processor_args = (self.total_conf, self.logger)
+ results = multiprocess_collate(processor_args, logs_to_process,
+ self.worker_count)
+
+ # reduce
+ aggr_data = self.get_aggregate_data(processed_files, results)
+ del results
+
+ # group
+ # reduce a large number of keys in aggr_data[k] to a small
+ # number of output keys
+ final_info = self.get_final_info(aggr_data)
+ del aggr_data
+
+ # output
+ return self.get_output(final_info)
+
+ def run_once(self, *args, **kwargs):
+ """
+ Process log files that fall within the lookback interval.
+
+ Upload resulting csv file to stats account.
+
+ Update processed files list and upload to stats account.
+ """
+
+ for k in 'lookback_hours lookback_window'.split():
+ if k in kwargs and kwargs[k] is not None:
+ setattr(self, k, kwargs[k])
+
+ start = time.time()
+ self.logger.info(_("Beginning log processing"))
+
+ lookback_start, lookback_end = self.get_lookback_interval()
+ self.logger.debug('lookback_start: %s' % lookback_start)
+ self.logger.debug('lookback_end: %s' % lookback_end)
+
+ processed_files = self.get_processed_files_list()
+ if processed_files == None:
+ self.logger.error(_('Log processing unable to load list of '
+ 'already processed log files'))
+ return
+ self.logger.debug(_('found %d processed files') %
+ len(processed_files))
+
+ logs_to_process = self.log_processor.get_data_list(lookback_start,
+ lookback_end, processed_files)
+ self.logger.info(_('loaded %d files to process') %
+ len(logs_to_process))
+
+ if logs_to_process:
+ output = self.process_logs(logs_to_process, processed_files)
+ self.store_output(output)
+ del output
+
+ self.store_processed_files_list(processed_files)
self.logger.info(_("Log processing done (%0.2f minutes)") %
- ((time.time() - start) / 60))
+ ((time.time() - start) / 60))
def multiprocess_collate(processor_args, logs_to_process, worker_count):
diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py
index 80b482256..fa90ec582 100644
--- a/test/unit/stats/test_log_processor.py
+++ b/test/unit/stats/test_log_processor.py
@@ -16,6 +16,10 @@
import unittest
from test.unit import tmpfile
import Queue
+import datetime
+import hashlib
+import pickle
+import time
from swift.common import internal_proxy
from swift.stats import log_processor
@@ -26,7 +30,6 @@ class FakeUploadApp(object):
def __init__(self, *args, **kwargs):
pass
-
class DumbLogger(object):
def __getattr__(self, n):
return self.foo
@@ -77,7 +80,7 @@ class DumbInternalProxy(object):
return self.code, data()
class TestLogProcessor(unittest.TestCase):
-
+
access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
'09/Jul/2010/04/14/30 GET '\
'/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
@@ -85,7 +88,7 @@ class TestLogProcessor(unittest.TestCase):
'6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
stats_test_line = 'account,1,2,3'
proxy_config = {'log-processor': {
-
+
}
}
@@ -426,3 +429,407 @@ use = egg:swift#proxy
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
+
+class TestLogProcessorDaemon(unittest.TestCase):
+
+ def test_get_lookback_interval(self):
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self, lookback_hours, lookback_window):
+ self.lookback_hours = lookback_hours
+ self.lookback_window = lookback_window
+
+ try:
+ d = datetime.datetime
+
+ for x in [
+ [d(2011, 1, 1), 0, 0, None, None],
+ [d(2011, 1, 1), 120, 0, '2010122700', None],
+ [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'],
+ [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'],
+ [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'],
+ [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'],
+ ]:
+
+ log_processor.now = lambda: x[0]
+
+ d = MockLogProcessorDaemon(x[1], x[2])
+ self.assertEquals((x[3], x[4]), d.get_lookback_interval())
+ finally:
+ log_processor.now = datetime.datetime.now
+
+ def test_get_processed_files_list(self):
+ class MockLogProcessor():
+ def __init__(self, stream):
+ self.stream = stream
+
+ def get_object_data(self, *args, **kwargs):
+ return self.stream
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self, stream):
+ self.log_processor = MockLogProcessor(stream)
+ self.log_processor_account = 'account'
+ self.log_processor_container = 'container'
+ self.processed_files_filename = 'filename'
+
+ file_list = set(['a', 'b', 'c'])
+
+ for s, l in [['', None],
+ [pickle.dumps(set()).split('\n'), set()],
+ [pickle.dumps(file_list).split('\n'), file_list],
+ ]:
+
+ self.assertEquals(l,
+ MockLogProcessorDaemon(s).get_processed_files_list())
+
+ def test_get_processed_files_list_bad_file_downloads(self):
+ class MockLogProcessor():
+ def __init__(self, status_code):
+ self.err = log_processor.BadFileDownload(status_code)
+
+ def get_object_data(self, *a, **k):
+ raise self.err
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self, status_code):
+ self.log_processor = MockLogProcessor(status_code)
+ self.log_processor_account = 'account'
+ self.log_processor_container = 'container'
+ self.processed_files_filename = 'filename'
+
+ for c, l in [[404, set()], [503, None], [None, None]]:
+ self.assertEquals(l,
+ MockLogProcessorDaemon(c).get_processed_files_list())
+
+ def test_get_aggregate_data(self):
+ # when run "for real"
+ # the various keys/values in the input and output
+ # dictionaries are often not simple strings
+ # for testing we can use keys that are easier to work with
+
+ processed_files = set()
+
+ data_in = [
+ ['file1', {
+ 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3},
+ 'acct1_time2': {'field1': 4, 'field2': 5},
+ 'acct2_time1': {'field1': 6, 'field2': 7},
+ 'acct3_time3': {'field1': 8, 'field2': 9},
+ }
+ ],
+ ['file2', {'acct1_time1': {'field1': 10}}],
+ ]
+
+ expected_data_out = {
+ 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3},
+ 'acct1_time2': {'field1': 4, 'field2': 5},
+ 'acct2_time1': {'field1': 6, 'field2': 7},
+ 'acct3_time3': {'field1': 8, 'field2': 9},
+ }
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self):
+ pass
+
+ d = MockLogProcessorDaemon()
+ data_out = d.get_aggregate_data(processed_files, data_in)
+
+ for k, v in expected_data_out.items():
+ self.assertEquals(v, data_out[k])
+
+ self.assertEquals(set(['file1', 'file2']), processed_files)
+
+ def test_get_final_info(self):
+ # when run "for real"
+ # the various keys/values in the input and output
+ # dictionaries are often not simple strings
+ # for testing we can use keys/values that are easier to work with
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self):
+ self._keylist_mapping = {
+ 'out_field1':['field1', 'field2', 'field3'],
+ 'out_field2':['field2', 'field3'],
+ 'out_field3':['field3'],
+ 'out_field4':'field4',
+ 'out_field5':['field6', 'field7', 'field8'],
+ 'out_field6':['field6'],
+ 'out_field7':'field7',
+ }
+
+ data_in = {
+ 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3,
+ 'field4': 8, 'field5': 11},
+ 'acct1_time2': {'field1': 4, 'field2': 5},
+ 'acct2_time1': {'field1': 6, 'field2': 7},
+ 'acct3_time3': {'field1': 8, 'field2': 9},
+ }
+
+ expected_data_out = {
+ 'acct1_time1': {'out_field1': 16, 'out_field2': 5,
+ 'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
+ 'out_field6': 0, 'out_field7': 0,},
+ 'acct1_time2': {'out_field1': 9, 'out_field2': 5,
+ 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
+ 'out_field6': 0, 'out_field7': 0,},
+ 'acct2_time1': {'out_field1': 13, 'out_field2': 7,
+ 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
+ 'out_field6': 0, 'out_field7': 0,},
+ 'acct3_time3': {'out_field1': 17, 'out_field2': 9,
+ 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
+ 'out_field6': 0, 'out_field7': 0,},
+ }
+
+ self.assertEquals(expected_data_out,
+ MockLogProcessorDaemon().get_final_info(data_in))
+
+ def test_store_processed_files_list(self):
+ class MockInternalProxy:
+ def __init__(self, test, daemon, processed_files):
+ self.test = test
+ self.daemon = daemon
+ self.processed_files = processed_files
+
+ def upload_file(self, f, account, container, filename):
+ self.test.assertEquals(self.processed_files,
+ pickle.loads(f.getvalue()))
+ self.test.assertEquals(self.daemon.log_processor_account,
+ account)
+ self.test.assertEquals(self.daemon.log_processor_container,
+ container)
+ self.test.assertEquals(self.daemon.processed_files_filename,
+ filename)
+
+ class MockLogProcessor:
+ def __init__(self, test, daemon, processed_files):
+ self.internal_proxy = MockInternalProxy(test, daemon,
+ processed_files)
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self, test, processed_files):
+ self.log_processor = \
+ MockLogProcessor(test, self, processed_files)
+ self.log_processor_account = 'account'
+ self.log_processor_container = 'container'
+ self.processed_files_filename = 'filename'
+
+ processed_files = set(['a', 'b', 'c'])
+ MockLogProcessorDaemon(self, processed_files).\
+ store_processed_files_list(processed_files)
+
+ def test_get_output(self):
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self):
+ self._keylist_mapping = {'a':None, 'b':None, 'c':None}
+
+ data_in = {
+ ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3},
+ ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30},
+ ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12},
+ ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25},
+ }
+
+ expected_data_out = [
+ ['data_ts', 'account', 'a', 'b', 'c'],
+ ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'],
+ ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
+ ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
+ ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
+ ]
+
+ data_out = MockLogProcessorDaemon().get_output(data_in)
+ self.assertEquals(expected_data_out[0], data_out[0])
+
+ for row in data_out[1:]:
+ self.assert_(row in expected_data_out)
+
+ for row in expected_data_out[1:]:
+ self.assert_(row in data_out)
+
+ def test_store_output(self):
+ try:
+ real_strftime = time.strftime
+ mock_strftime_return = '2010/03/02/01/'
+ def mock_strftime(format):
+ self.assertEquals('%Y/%m/%d/%H/', format)
+ return mock_strftime_return
+ log_processor.time.strftime = mock_strftime
+
+ data_in = [
+ ['data_ts', 'account', 'a', 'b', 'c'],
+ ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'],
+ ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
+ ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
+ ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
+ ]
+
+ expected_output = '\n'.join([','.join(row) for row in data_in])
+ h = hashlib.md5(expected_output).hexdigest()
+ expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h)
+
+ class MockInternalProxy:
+ def __init__(self, test, daemon, expected_filename,
+ expected_output):
+ self.test = test
+ self.daemon = daemon
+ self.expected_filename = expected_filename
+ self.expected_output = expected_output
+
+ def upload_file(self, f, account, container, filename):
+ self.test.assertEquals(self.daemon.log_processor_account,
+ account)
+ self.test.assertEquals(self.daemon.log_processor_container,
+ container)
+ self.test.assertEquals(self.expected_filename, filename)
+ self.test.assertEquals(self.expected_output, f.getvalue())
+
+ class MockLogProcessor:
+ def __init__(self, test, daemon, expected_filename,
+ expected_output):
+ self.internal_proxy = MockInternalProxy(test, daemon,
+ expected_filename, expected_output)
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self, test, expected_filename, expected_output):
+ self.log_processor = MockLogProcessor(test, self,
+ expected_filename, expected_output)
+ self.log_processor_account = 'account'
+ self.log_processor_container = 'container'
+ self.processed_files_filename = 'filename'
+
+ MockLogProcessorDaemon(self, expected_filename, expected_output).\
+ store_output(data_in)
+ finally:
+ log_processor.time.strftime = real_strftime
+
+ def test_keylist_mapping(self):
+ # Kind of lame test to see if the propery is both
+ # generated by a particular method and cached properly.
+ # The method that actually generates the mapping is
+ # tested elsewhere.
+
+ value_return = 'keylist_mapping'
+ class MockLogProcessor:
+ def __init__(self):
+ self.call_count = 0
+
+ def generate_keylist_mapping(self):
+ self.call_count += 1
+ return value_return
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self):
+ self.log_processor = MockLogProcessor()
+ self._keylist_mapping = None
+
+ d = MockLogProcessorDaemon()
+ self.assertEquals(value_return, d.keylist_mapping)
+ self.assertEquals(value_return, d.keylist_mapping)
+ self.assertEquals(1, d.log_processor.call_count)
+
+ def test_process_logs(self):
+ try:
+ mock_logs_to_process = 'logs_to_process'
+ mock_processed_files = 'processed_files'
+
+ real_multiprocess_collate = log_processor.multiprocess_collate
+ multiprocess_collate_return = 'multiprocess_collate_return'
+
+ get_aggregate_data_return = 'get_aggregate_data_return'
+ get_final_info_return = 'get_final_info_return'
+ get_output_return = 'get_output_return'
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self, test):
+ self.test = test
+ self.total_conf = 'total_conf'
+ self.logger = 'logger'
+ self.worker_count = 'worker_count'
+
+ def get_aggregate_data(self, processed_files, results):
+ self.test.assertEquals(mock_processed_files, processed_files)
+ self.test.assertEquals(multiprocess_collate_return, results)
+ return get_aggregate_data_return
+
+ def get_final_info(self, aggr_data):
+ self.test.assertEquals(get_aggregate_data_return, aggr_data)
+ return get_final_info_return
+
+ def get_output(self, final_info):
+ self.test.assertEquals(get_final_info_return, final_info)
+ return get_output_return
+
+ d = MockLogProcessorDaemon(self)
+
+ def mock_multiprocess_collate(processor_args, logs_to_process,
+ worker_count):
+ self.assertEquals(d.total_conf, processor_args[0])
+ self.assertEquals(d.logger, processor_args[1])
+
+ self.assertEquals(mock_logs_to_process, logs_to_process)
+ self.assertEquals(d.worker_count, worker_count)
+
+ return multiprocess_collate_return
+
+ log_processor.multiprocess_collate = mock_multiprocess_collate
+
+ output = d.process_logs(mock_logs_to_process, mock_processed_files)
+ self.assertEquals(get_output_return, output)
+ finally:
+ log_processor.multiprocess_collate = real_multiprocess_collate
+
+ def test_run_once_get_processed_files_list_returns_none(self):
+ class MockLogProcessor:
+ def get_data_list(self, lookback_start, lookback_end,
+ processed_files):
+ raise unittest.TestCase.failureException, \
+ 'Method should not be called'
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self):
+ self.logger = DumbLogger()
+ self.log_processor = MockLogProcessor()
+
+ def get_lookback_interval(self):
+ return None, None
+
+ def get_processed_files_list(self):
+ return None
+
+ MockLogProcessorDaemon().run_once()
+
+ def test_run_once_no_logs_to_process(self):
+ class MockLogProcessor():
+ def __init__(self, daemon, test):
+ self.daemon = daemon
+ self.test = test
+
+ def get_data_list(self, lookback_start, lookback_end,
+ processed_files):
+ self.test.assertEquals(self.daemon.lookback_start,
+ lookback_start)
+ self.test.assertEquals(self.daemon.lookback_end,
+ lookback_end)
+ self.test.assertEquals(self.daemon.processed_files,
+ processed_files)
+ return []
+
+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
+ def __init__(self, test):
+ self.logger = DumbLogger()
+ self.log_processor = MockLogProcessor(self, test)
+ self.lookback_start = 'lookback_start'
+ self.lookback_end = 'lookback_end'
+ self.processed_files = ['a', 'b', 'c']
+
+ def get_lookback_interval(self):
+ return self.lookback_start, self.lookback_end
+
+ def get_processed_files_list(self):
+ return self.processed_files
+
+ def process_logs(logs_to_process, processed_files):
+ raise unittest.TestCase.failureException, \
+ 'Method should not be called'
+
+ MockLogProcessorDaemon(self).run_once()