summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2015-01-16 18:48:13 +0000
committerKim van der Riet <kpvdr@apache.org>2015-01-16 18:48:13 +0000
commitb47672cca797af3bf7075d98dad71321ad29fe41 (patch)
tree6a61f93cb2daae01ac885edcdf8af6f58363ca4b /tools
parent1142cd6bb04d14541240c9a69352937aac6b09ee (diff)
downloadqpid-python-b47672cca797af3bf7075d98dad71321ad29fe41.tar.gz
QPID-5362: Linearstore: No store tools exist for examining the journals - Bugfix and update for new partition and directory structure change from QPID-5671 and QPID-6303
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1652490 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tools')
-rw-r--r--tools/src/py/qlslibs/analyze.py (renamed from tools/src/py/qlslibs/anal.py)39
-rw-r--r--tools/src/py/qlslibs/efp.py60
-rw-r--r--tools/src/py/qlslibs/jrnl.py23
-rwxr-xr-xtools/src/py/qpid-qls-analyze16
4 files changed, 82 insertions, 56 deletions
diff --git a/tools/src/py/qlslibs/anal.py b/tools/src/py/qlslibs/analyze.py
index df51c1b2b4..a67e17e426 100644
--- a/tools/src/py/qlslibs/anal.py
+++ b/tools/src/py/qlslibs/analyze.py
@@ -18,7 +18,7 @@
#
"""
-Module: qlslibs.anal
+Module: qlslibs.analyze
Classes for recovery and analysis of a Qpid Linear Store (QLS).
"""
@@ -41,8 +41,8 @@ class HighCounter(object):
return self.num
class JournalRecoveryManager(object):
- TPL_DIR_NAME = 'tpl'
- JRNL_DIR_NAME = 'jrnl'
+ TPL_DIR_NAME = 'tpl2'
+ JRNL_DIR_NAME = 'jrnl2'
def __init__(self, directory, args):
if not os.path.exists(directory):
raise qlslibs.err.InvalidQlsDirectoryNameError(directory)
@@ -55,15 +55,15 @@ class JournalRecoveryManager(object):
def report(self, print_stats_flag):
self._reconcile_transactions(self.prepared_list, self.args.txn)
if self.tpl is not None:
- self.tpl.report(print_stats_flag)
+ self.tpl.report(print_stats_flag, self.args.show_recovered_recs)
for queue_name in sorted(self.journals.keys()):
- self.journals[queue_name].report(print_stats_flag)
+ self.journals[queue_name].report(print_stats_flag, self.args.show_recovered_recs)
def run(self):
tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
if os.path.exists(tpl_dir):
self.tpl = Journal(tpl_dir, None, self.args)
self.tpl.recover(self.high_rid_counter)
- if self.args.show_recs or self.args.show_all_recs:
+ if self.args.show_recovery_recs or self.args.show_all_recs:
print
jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {}
@@ -72,8 +72,9 @@ class JournalRecoveryManager(object):
jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args)
jrnl.recover(self.high_rid_counter)
self.journals[jrnl.get_queue_name()] = jrnl
- if self.args.show_recs or self.args.show_all_recs:
+ if self.args.show_recovery_recs or self.args.show_all_recs:
print
+ print
def _reconcile_transactions(self, prepared_list, txn_flag):
print 'Transaction reconciliation report:'
print '=================================='
@@ -315,6 +316,7 @@ class Journal(object):
"""
Instance of a Qpid Linear Store (QLS) journal.
"""
+ JRNL_SUFFIX = 'jrnl'
def __init__(self, directory, xid_prepared_list, args):
self.directory = directory
self.queue_name = os.path.basename(directory)
@@ -350,7 +352,7 @@ class Journal(object):
def get_queue_name(self):
return self.queue_name
def recover(self, high_rid_counter):
- print 'Recovering', self.queue_name
+ print 'Recovering %s...' % self.queue_name,
self._analyze_files()
try:
while self._get_next_record(high_rid_counter):
@@ -362,6 +364,7 @@ class Journal(object):
print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \
(err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(),
err.get_expected_fro())
+ print 'done'
def reconcile_transactions(self, prepared_list, txn_flag):
xid_list = self.txn_map.get_xid_list()
if len(xid_list) > 0:
@@ -385,13 +388,13 @@ class Journal(object):
print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
if txn_flag:
self.txn_map.abort(xid)
- def report(self, print_stats_flag):
+ def report(self, print_stats_flag, show_recovered_records):
print 'Journal "%s":' % self.queue_name
print '=' * (11 + len(self.queue_name))
if print_stats_flag:
print str(self.statistics)
- print self.enq_map.report_str(True, True)
- print self.txn_map.report_str(True, True)
+ print self.enq_map.report_str(True, show_recovered_records)
+ print self.txn_map.report_str(True, show_recovered_records)
JournalFile.report_header()
for file_num in sorted(self.files.keys()):
self.files[file_num].report()
@@ -405,7 +408,7 @@ class Journal(object):
def _analyze_files(self):
for dir_entry in os.listdir(self.directory):
dir_entry_bits = dir_entry.split('.')
- if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME:
+ if len(dir_entry_bits) == 2 and dir_entry_bits[1] == Journal.JRNL_SUFFIX:
fq_file_name = os.path.join(self.directory, dir_entry)
file_handle = open(fq_file_name)
args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader)
@@ -413,7 +416,7 @@ class Journal(object):
file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader))
if file_hdr.is_header_valid(file_hdr):
file_hdr.load(file_handle)
- if file_hdr.is_valid():
+ if file_hdr.is_valid(False):
qlslibs.utils.skip(file_handle,
file_hdr.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE)
self.files[file_hdr.file_num] = JournalFile(file_hdr)
@@ -430,7 +433,7 @@ class Journal(object):
qlslibs.utils.DEFAULT_DBLK_SIZE
self.fill_to_offset = self.last_record_offset + \
(self.num_filler_records_required * qlslibs.utils.DEFAULT_DBLK_SIZE)
- if self.args.show_recs or self.args.show_all_recs:
+ if self.args.show_recovery_recs or self.args.show_all_recs:
print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
(self.current_journal_file.file_header.file_num, self.last_record_offset,
self.num_filler_records_required, self.fill_to_offset)
@@ -460,7 +463,7 @@ class Journal(object):
return False
self.current_journal_file = self.files[file_num]
self.first_rec_flag = True
- if self.args.show_recs or self.args.show_all_recs:
+ if self.args.show_recovery_recs or self.args.show_all_recs:
file_header = self.current_journal_file.file_header
print '0x%x:%s' % (file_header.file_num, file_header.to_string())
return True
@@ -480,18 +483,18 @@ class Journal(object):
if isinstance(this_record, qlslibs.jrnl.EnqueueRecord):
ok_flag = self._handle_enqueue_record(this_record, start_journal_file)
high_rid_counter.check(this_record.record_id)
- if self.args.show_recs or self.args.show_all_recs:
+ if self.args.show_recovery_recs or self.args.show_all_recs:
print '0x%x:%s' % (start_journal_file.file_header.file_num, \
this_record.to_string(self.args.show_xids, self.args.show_data))
elif isinstance(this_record, qlslibs.jrnl.DequeueRecord):
ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
high_rid_counter.check(this_record.record_id)
- if self.args.show_recs or self.args.show_all_recs:
+ if self.args.show_recovery_recs or self.args.show_all_recs:
print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
elif isinstance(this_record, qlslibs.jrnl.TransactionRecord):
ok_flag = self._handle_transaction_record(this_record, start_journal_file)
high_rid_counter.check(this_record.record_id)
- if self.args.show_recs or self.args.show_all_recs:
+ if self.args.show_recovery_recs or self.args.show_all_recs:
print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
else:
self.statistics.filler_record_count += 1
diff --git a/tools/src/py/qlslibs/efp.py b/tools/src/py/qlslibs/efp.py
index 0fa27b3519..1c751c3d06 100644
--- a/tools/src/py/qlslibs/efp.py
+++ b/tools/src/py/qlslibs/efp.py
@@ -85,15 +85,16 @@ class EfpManager(object):
self.efp_partitions.remove(self.current_efp_partition)
shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name))
def report(self):
- print 'Empty File Pool (EFP) report:'
- print '============================='
- print 'Found', len(self.efp_partitions), 'partition(s).'
+ print 'Empty File Pool (EFP) report'
+ print '============================'
+ print 'Found', len(self.efp_partitions), 'partition(s)'
if (len(self.efp_partitions)) > 0:
+ sorted_efp_partitions = sorted(self.efp_partitions, key=lambda x: x.partition_number)
EfpPartition.print_report_table_header()
- for ptn in self.efp_partitions:
+ for ptn in sorted_efp_partitions:
ptn.print_report_table_line()
print
- for ptn in self.efp_partitions:
+ for ptn in sorted_efp_partitions:
ptn.report()
def run(self, arg_tup):
self._analyze_efp()
@@ -182,9 +183,12 @@ class EfpPartition(object):
self.tot_file_size_kb, self.directory)
def report(self):
print 'Partition %s:' % os.path.basename(self.directory)
- EmptyFilePool.print_report_table_header()
- for dir_name in self.efp_pools.keys():
- self.efp_pools[dir_name].print_report_table_line()
+ if len(self.efp_pools) > 0:
+ EmptyFilePool.print_report_table_header()
+ for dir_name in self.efp_pools.keys():
+ self.efp_pools[dir_name].print_report_table_line()
+ else:
+ print '<empty - no EFPs found in this partition>'
print
def scan(self):
if os.path.exists(self.directory):
@@ -217,13 +221,16 @@ class EmptyFilePool(object):
"""
EFP_DIR_SUFFIX = 'k'
EFP_JRNL_EXTENTION = '.jrnl'
+ EFP_INUSE_DIRNAME = 'in_use'
+ EFP_RETURNED_DIRNAME = 'returned'
def __init__(self, directory, partition_number):
self.base_dir_name = os.path.basename(directory)
self.directory = directory
self.partition_number = partition_number
self.data_size_kb = None
- self.files = []
- self.tot_file_size_kb = 0
+ self.efp_files = []
+ self.in_use_files = []
+ self.returned_files = []
self._validate_efp_directory()
def create_new_efp_files(self, num_files):
""" Create one or more new empty journal files of the prescribed size for this EFP """
@@ -238,24 +245,37 @@ class EmptyFilePool(object):
""" Static function to create an EFP directory name from the size of the files it contains """
return '%dk' % file_size_kb
def get_tot_file_count(self):
- return len(self.files)
+ return len(self.efp_files)
def get_tot_file_size_kb(self):
- return self.data_size_kb * len(self.files)
+ return self.data_size_kb * len(self.efp_files)
@staticmethod
def print_report_table_header():
- print 'data_size_kb file_count tot_file_size_kb efp_directory'
- print '------------ ---------- ---------------- -------------'
+ print ' ---------- efp ------------ --------- in_use ---------- -------- returned ---------'
+ print 'data_size_kb file_count tot_file_size_kb file_count tot_file_size_kb file_count tot_file_size_kb efp_directory'
+ print '------------ ---------- ---------------- ---------- ---------------- ---------- ---------------- -------------'
def print_report_table_line(self):
- print '%12d %10d %16d %s' % (self.data_size_kb, self.get_tot_file_count(),
- self.get_tot_file_size_kb(), self.get_directory())
+ print '%12d %10d %16d %10d %16d %10d %16d %s' % (self.data_size_kb, len(self.efp_files),
+ self.data_size_kb * len(self.efp_files),
+ len(self.in_use_files),
+ self.data_size_kb * len(self.in_use_files),
+ len(self.returned_files),
+ self.data_size_kb * len(self.returned_files),
+ self.get_directory())
def scan(self):
for efp_file in os.listdir(self.directory):
+ if efp_file == self.EFP_INUSE_DIRNAME:
+ for in_use_file in os.listdir(os.path.join(self.directory, self.EFP_INUSE_DIRNAME)):
+ self.in_use_files.append(in_use_file)
+ continue
+ if efp_file == self.EFP_RETURNED_DIRNAME:
+ for returned_file in os.listdir(os.path.join(self.directory, self.EFP_RETURNED_DIRNAME)):
+ self.returned_files.append(returned_file)
+ continue
if self._validate_efp_file(os.path.join(self.directory, efp_file)):
- self.files.append(efp_file)
+ self.efp_files.append(efp_file)
def _add_efp_file(self, efp_file_name):
""" Add a single journal file of the appropriate size to this EFP. No file size check is made here. """
- self.files.append(efp_file_name)
- self.tot_file_size_kb += os.path.getsize(efp_file_name)
+ self.efp_files.append(efp_file_name)
def _create_new_efp_file(self):
""" Create a single new empty journal file of the prescribed size for this EFP """
file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION
@@ -296,7 +316,7 @@ class EmptyFilePool(object):
return False
file_hdr.load(file_handle)
file_handle.close()
- if not file_hdr.is_valid():
+ if not file_hdr.is_valid(True):
return False
return True
diff --git a/tools/src/py/qlslibs/jrnl.py b/tools/src/py/qlslibs/jrnl.py
index 555cf6a8ae..ee25015220 100644
--- a/tools/src/py/qlslibs/jrnl.py
+++ b/tools/src/py/qlslibs/jrnl.py
@@ -155,19 +155,24 @@ class FileHeader(RecordHeader):
self.queue_name = file_handle.read(self.queue_name_len)
def is_end_of_file(self):
return self.file_handle.tell() >= self.get_file_size()
- def is_valid(self):
+ def is_valid(self, is_empty):
if not RecordHeader.is_header_valid(self, self):
return False
if self.file_handle is None or self.file_header_size_sblks == 0 or self.partition_num == 0 or \
- self.efp_data_size_kb == 0 or self.first_record_offset == 0 or self.timestamp_sec == 0 or \
- self.timestamp_ns == 0 or self.file_num == 0:
- return False
- if self.queue_name_len == 0:
- return False
- if self.queue_name is None:
- return False
- if len(self.queue_name) != self.queue_name_len:
+ self.efp_data_size_kb == 0:
return False
+ if is_empty:
+ if self.first_record_offset != 0 or self.timestamp_sec != 0 or self.timestamp_ns != 0 or \
+ self.file_num != 0 or self.queue_name_len != 0:
+ return False
+ else:
+ if self.first_record_offset == 0 or self.timestamp_sec == 0 or self.timestamp_ns == 0 or \
+ self.file_num == 0 or self.queue_name_len == 0:
+ return False
+ if self.queue_name is None:
+ return False
+ if len(self.queue_name) != self.queue_name_len:
+ return False
return True
def timestamp_str(self):
"""Get the timestamp of this record in string format"""
diff --git a/tools/src/py/qpid-qls-analyze b/tools/src/py/qpid-qls-analyze
index 5c7d04b4c0..35b370f4a3 100755
--- a/tools/src/py/qpid-qls-analyze
+++ b/tools/src/py/qpid-qls-analyze
@@ -28,11 +28,11 @@ import sys
default = os.path.normpath('/usr/share/qpid-tools')
home = os.environ.get('QPID_TOOLS_HOME', default)
-sys.path.append(os.path.join(home,'python'))
+sys.path.append(os.path.join(home, 'python'))
import argparse
import os
-import qlslibs.anal
+import qlslibs.analyze
import qlslibs.efp
class QlsAnalyzerArgParser(argparse.ArgumentParser):
@@ -45,7 +45,9 @@ class QlsAnalyzerArgParser(argparse.ArgumentParser):
help='Qpid Linear Store (QLS) directory to be analyzed')
self.add_argument('--efp', action='store_true',
help='Analyze the Emtpy File Pool (EFP) and show stats')
- self.add_argument('--show-recs', action='store_true',
+ self.add_argument('--show-recovered-recs', action='store_true',
+ help='Show only recovered records')
+ self.add_argument('--show-recovery-recs', action='store_true',
help='Show material records found during recovery')
self.add_argument('--show-all-recs', action='store_true',
help='Show all records (including fillers) found during recovery')
@@ -72,13 +74,13 @@ class QqpdLinearStoreAnalyzer(object):
* The Linear Store
* The Transaction Prepared List (TPL)
"""
- QLS_ANALYZE_VERSION = '0.1'
+ QLS_ANALYZE_VERSION = '1.0'
def __init__(self):
self.args = None
self._process_args()
self.qls_dir = os.path.abspath(self.args.qls_dir)
self.efp_manager = qlslibs.efp.EfpManager(self.qls_dir, None)
- self.jrnl_recovery_mgr = qlslibs.anal.JournalRecoveryManager(self.qls_dir, self.args)
+ self.jrnl_recovery_mgr = qlslibs.analyze.JournalRecoveryManager(self.qls_dir, self.args)
def _process_args(self):
""" Create arg parser and process args """
parser = QlsAnalyzerArgParser()
@@ -101,10 +103,6 @@ class QqpdLinearStoreAnalyzer(object):
#==============================================================================
if __name__ == "__main__":
- # TODO: Remove this in due course
- print 'WARNING: This program is still a work in progress and is largely untested.'
- print '* USE AT YOUR OWN RISK *'
- print
M = QqpdLinearStoreAnalyzer()
M.run()
M.report()